diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 6f7c43a24b8a97030df17b2c3403cdd0c9a815d4..1cbaebe3f37d34283ee1fd458c80adaa31b5a432 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2041,6 +2041,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); + if (pQueryInfo->pQInfo == NULL) { + taosHashCleanup(tableGroupInfo.map); + taosArrayDestroy(group); + tscAsyncResultOnError(pSql); + pRes->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pRes->code; + } } uint64_t localQueryId = pSql->self; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 3a0be5e96d9d5d1518d9f2a1e833e729a0fa8b67..cb2a045a1961c4a4cfed8fea895723a6bc161c17 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3873,8 +3873,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr STsBufInfo bufInfo = {0}; SQueryParam param = {.pOperator = pa}; - /*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); + int32_t code = initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); taosArrayDestroy(pa); + if (code != TSDB_CODE_SUCCESS) { + goto _cleanup; + } return pQInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8f5cdb5fd49d1a929afd4ab81fa10243ae06f415..b886f8f39b43533eb85f52e97916f615ce3ed56a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -321,9 +321,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO const static int32_t minSize = 8; SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); - res->info.numOfCols = numOfOutput; + if (res == NULL) { + qError("failed to allocate for output buffer"); + goto _clean; + } res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + if (res->pDataBlock == NULL) { + qError("failed to init arrary for data block of output buffer"); + goto _clean; + } + for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData idata = {{0}}; idata.info.type = pExpr[i].base.resType; @@ -332,10 +340,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO int32_t size = MAX(idata.info.bytes * numOfRows, minSize); idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform + if (idata.pData == NULL) { + qError("failed to allocate column buffer for output buffer"); + goto _clean; + } + taosArrayPush(res->pDataBlock, &idata); + res->info.numOfCols++; } return res; + +_clean: + destroyOutputBuf(res); + return NULL; } void* destroyOutputBuf(SSDataBlock* pBlock) { @@ -2182,23 +2200,35 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf switch (*op) { case OP_TagScan: { pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_MultiTableTimeInterval: { pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); break; } case OP_AllMultiTableTimeInterval: { pRuntimeEnv->proot = createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); break; } case OP_TimeWindow: { pRuntimeEnv->proot = createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2208,6 +2238,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_AllTimeWindow: { pRuntimeEnv->proot = createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2217,6 +2250,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Groupby: { pRuntimeEnv->proot = createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { @@ -2227,6 +2263,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_SessionWindow: { pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2236,12 +2275,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_MultiTableAggregate: { pRuntimeEnv->proot = createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); break; } case OP_Aggregate: { pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { @@ -2262,11 +2307,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf assert(pQueryAttr->pExpr2 != NULL); pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); } + + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_StateWindow: { pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2276,6 +2328,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Limit: { pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2287,12 +2342,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } } else { SColumnInfo* pColInfo = extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } } break; @@ -2301,11 +2362,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Fill: { SOperatorInfo* pInfo = pRuntimeEnv->proot; pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_MultiwayMergeSort: { pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2317,6 +2384,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2324,16 +2394,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf int32_t num = pRuntimeEnv->proot->numOfOutput; SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_Distinct: { pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_Order: { pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -4832,7 +4911,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; pQueryAttr->tsdb = tsdb; - if (tsdb != NULL) { int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); if (code != TSDB_CODE_SUCCESS) { @@ -4853,18 +4931,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr switch(tbScanner) { case OP_TableBlockInfoScan: { pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } case OP_TableSeqScan: { pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } case OP_DataBlocksOptScan: { pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } case OP_TableScan: { pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } default: { // do nothing @@ -4929,7 +5019,6 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI } } - STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { STsdbQueryCond cond = { .colList = pQueryAttr->tableCols, @@ -5170,6 +5259,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = 0; @@ -5178,6 +5271,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* // pInfo->prevGroupId = -1; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + tfree(pInfo); + return NULL; + } + pOperator->name = "TableScanOperator"; pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; @@ -5192,6 +5290,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->times = 1; @@ -5202,6 +5303,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pRuntimeEnv->enableGroupData = true; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + tfree(pInfo); + return NULL; + } + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = OP_TableSeqScan; pOperator->blockingOptr = false; @@ -5216,9 +5322,15 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + if (pInfo->block.pDataBlock == NULL) { + goto _clean; + } SColumnInfoData infoData = {{0}}; infoData.info.type = TSDB_DATA_TYPE_BINARY; @@ -5227,6 +5339,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu taosArrayPush(pInfo->block.pDataBlock, &infoData); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + taosArrayDestroy(pInfo->block.pDataBlock); + goto _clean; + } + pOperator->name = "TableBlockInfoScanOperator"; pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; @@ -5237,6 +5354,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu pOperator->exec = doBlockInfoScan; return pOperator; + +_clean: + tfree(pInfo); + + return NULL; } void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { @@ -5299,6 +5421,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; @@ -5306,6 +5432,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime pInfo->order = pRuntimeEnv->pQueryAttr->order.order; SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); + if (pOptr == NULL) { + tfree(pInfo); + return NULL; + } + pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->operatorType = OP_DataBlocksOptScan; pOptr->pRuntimeEnv = pRuntimeEnv; @@ -5326,6 +5457,10 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); } + if (pOrderColumns == NULL) { + return NULL; + } + if (pQuery->interval.interval > 0) { if (pOrderColumns == NULL) { pOrderColumns = taosArrayInit(1, sizeof(SColIndex)); @@ -5393,21 +5528,44 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param; destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput); - taosArrayDestroy(pInfo->orderColumnList); - taosArrayDestroy(pInfo->groupColumnList); - tfree(pInfo->prevRow); - tfree(pInfo->currentGroupColData); + if (pInfo->orderColumnList) { + taosArrayDestroy(pInfo->orderColumnList); + } + + if (pInfo->groupColumnList) { + taosArrayDestroy(pInfo->groupColumnList); + } + + if (pInfo->prevRow) { + tfree(pInfo->prevRow); + } + + if (pInfo->currentGroupColData) { + tfree(pInfo->currentGroupColData); + } } static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; - taosArrayDestroy(pInfo->orderColumnList); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); - tfree(pInfo->prevRow); + + if (pInfo->orderColumnList) { + taosArrayDestroy(pInfo->orderColumnList); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } + + if (pInfo->prevRow) { + tfree(pInfo->prevRow); + } } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); @@ -5423,6 +5581,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->orderColumnList == NULL || pInfo->groupColumnList == NULL) { + goto _clean; + } + // TODO refactor int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { @@ -5442,6 +5604,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0; pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); + if (pInfo->currentGroupColData == NULL) { + goto _clean; + } + offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { @@ -5452,11 +5618,18 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } pInfo->seed = rand(); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "GlobalAggregate"; pOperator->operatorType = OP_GlobalAggregate; pOperator->blockingOptr = true; @@ -5471,17 +5644,30 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyGlobalAggOperatorInfo((void *) pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, int32_t numOfRows, void *merger) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pMerge = merger; pInfo->bufCapacity = numOfRows; pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + if (pInfo->orderColumnList == NULL || pInfo->binfo.pRes == NULL) { + goto _clean; + } + { // todo extract method to create prev compare buffer int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { @@ -5490,6 +5676,9 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + if (pInfo->prevRow == NULL) { + goto _clean; + } int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { @@ -5501,6 +5690,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "MultiwaySortOperator"; pOperator->operatorType = OP_MultiwayMergeSort; pOperator->blockingOptr = false; @@ -5512,6 +5705,12 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->exec = doMultiwayMergeSort; pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; + +_clean: + destroyGlobalAggOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { @@ -5588,11 +5787,22 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } { SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); + if (pDataBlock == NULL) { + goto _clean; + } + pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < numOfOutput; ++i) { + if (pDataBlock->pDataBlock == NULL) { + goto _clean; + } + + for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData col = {{0}}; col.info.colId = pExpr[i].base.colInfo.colId; col.info.bytes = pExpr[i].base.colBytes; @@ -5610,6 +5820,10 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "InMemoryOrder"; pOperator->operatorType = OP_Order; pOperator->blockingOptr = true; @@ -5621,6 +5835,12 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyOrderOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { @@ -5889,11 +6109,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { int32_t skip = 0; int32_t remain = 0; int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle); - + if (pRuntimeEnv->currentOffset == 0) { break; - } - else if(srows > 0) { + } else if(srows > 0) { if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) { pRuntimeEnv->currentOffset -= pBlock->info.rows; } else { @@ -5908,7 +6127,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { skip = (int32_t)pRuntimeEnv->currentOffset; remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); } - + // need move if(move) { pBlock->info.rows = remain; @@ -6606,6 +6825,9 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); @@ -6615,10 +6837,18 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + pInfo->seed = rand(); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "TableAggregate"; pOperator->operatorType = OP_Aggregate; pOperator->blockingOptr = true; @@ -6633,31 +6863,53 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyAggOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { assert(pInfo != NULL); - destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); - tfree(pInfo->rowCellInfoOffset); + if (pInfo->pCtx) { + destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); + } + + if (pInfo->rowCellInfoOffset) { + tfree(pInfo->rowCellInfoOffset); + } - cleanupResultRowInfo(&pInfo->resultRowInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + if (pInfo->resultRowInfo.pResult) { + cleanupResultRowInfo(&pInfo->resultRowInfo); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } } static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; doDestroyBasicInfo(pInfo, numOfOutput); } + static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - tfree(pInfo->prevData); + + if (pInfo->prevData) { + tfree(pInfo->prevData); + } } + static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } + static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -6665,15 +6917,26 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; - pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); - tfree(pInfo->p); + if (pInfo->pFillInfo) { + pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } + + if (pInfo->p) { + tfree(pInfo->p); + } } static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - tfree(pInfo->prevData); + + if (pInfo->prevData) { + tfree(pInfo->prevData); + } } static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { @@ -6683,12 +6946,16 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; - pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); + if (pInfo->pDataBlock) { + pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); + } } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -6698,22 +6965,45 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; - taosHashCleanup(pInfo->pSet); - tfree(pInfo->buf); - taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + if (pInfo->pSet) { + taosHashCleanup(pInfo->pSet); + } + + if (pInfo->buf) { + tfree(pInfo->buf); + } + + if (pInfo->pDistinctDataInfo) { + taosArrayDestroy(pInfo->pDistinctDataInfo); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } } SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "MultiTableAggregate"; pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; @@ -6728,10 +7018,19 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyAggOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->seed = rand(); pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; @@ -6741,9 +7040,18 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pBInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "ProjectOperator"; pOperator->operatorType = OP_Project; pOperator->blockingOptr = false; @@ -6758,6 +7066,12 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyProjectOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { @@ -6792,12 +7106,18 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } assert(numOfFilter > 0 && pCols != NULL); doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); pInfo->numOfFilterCols = numOfFilter; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "FilterOperator"; pOperator->operatorType = OP_Filter; @@ -6812,13 +7132,27 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyConditionOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + tfree(pInfo); + return NULL; + } pOperator->name = "LimitOperator"; pOperator->operatorType = OP_Limit; @@ -6834,12 +7168,23 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->pRes == NULL || pInfo->pCtx == NULL || pInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = OP_TimeWindow; @@ -6854,17 +7199,34 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyBasicOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->pRes == NULL || pInfo->pCtx == NULL || pInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "AllTimeIntervalAggOperator"; pOperator->operatorType = OP_AllTimeWindow; @@ -6879,17 +7241,36 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyBasicOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->colIndex = -1; pInfo->reptScan = false; pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "StateWindowOperator"; pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; @@ -6903,17 +7284,35 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyStateWindowOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } + SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + pInfo->prevTs = INT64_MIN; pInfo->reptScan = false; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = OP_SessionWindow; @@ -6928,16 +7327,34 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroySWindowOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->pRes == NULL || pInfo->pCtx == NULL || pInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "MultiTableTimeIntervalOperator"; pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; @@ -6952,16 +7369,34 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyBasicOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->pRes == NULL || pInfo->pCtx == NULL || pInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "AllMultiTableTimeIntervalOperator"; pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; @@ -6977,14 +7412,22 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyBasicOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); - pInfo->colIndex = -1; // group by column index - + if (pInfo == NULL) { + return NULL; + } + pInfo->colIndex = -1; // group by column index pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6993,9 +7436,18 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; @@ -7009,16 +7461,34 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyGroupbyOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + if (pInfo->pRes == NULL) { + goto _clean; + } + pInfo->multigroupResult = multigroupResult; { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal); + if (pColInfo == NULL) { + goto _clean; + } + STimeWindow w = TSWINDOW_INITIALIZER; TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); @@ -7029,11 +7499,20 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + if (pInfo->pFillInfo == NULL) { + goto _clean; + } pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES); + if (pInfo->p == NULL) { + goto _clean; + } } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "FillOperator"; pOperator->blockingOptr = false; @@ -7048,14 +7527,27 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroySFillOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); + if (pInfo->orderColumnList == NULL) { + goto _clean; + } + pInfo->slimit = pQueryAttr->slimit; pInfo->limit = pQueryAttr->limit; pInfo->capacity = pRuntimeEnv->resultInfo.capacity; @@ -7072,6 +7564,9 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + if (pInfo->prevRow == NULL) { + goto _clean; + } int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { @@ -7083,7 +7578,14 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + if (pInfo->pRes == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "SLimitOperator"; pOperator->operatorType = OP_SLimit; @@ -7096,6 +7598,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroySlimitOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static SSDataBlock* doTagScan(void* param, bool* newgroup) { @@ -7226,8 +7734,16 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + if (pInfo->pRes == NULL) { + goto _clean; + } + size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); @@ -7235,6 +7751,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf pInfo->curPos = 0; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "SeqTableTagScan"; pOperator->operatorType = OP_TagScan; pOperator->blockingOptr = false; @@ -7247,7 +7767,14 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf pOperator->cleanup = destroyTagScanOperatorInfo; return pOperator; + +_clean: + destroyTagScanOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } + static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) { if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { // distinct info already inited @@ -7361,6 +7888,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->totalBytes = 0; pInfo->buf = NULL; @@ -7370,7 +7900,15 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); + if (pInfo->pDistinctDataInfo == NULL || pInfo->pSet == NULL || pInfo->pRes == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; @@ -7385,6 +7923,12 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyDistinctOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { @@ -8543,15 +9087,15 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S goto _cleanup; } - // calc skipOffset + // calc skipOffset if(pQueryMsg->offset > 0 && TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_PROJECTION_QUERY) && pQueryAttr->stableQuery == false) { pQueryAttr->skipOffset = true; for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { - if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 + if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - pQueryAttr->skipOffset = false; - break; + pQueryAttr->skipOffset = false; + break; } } } diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index d999f10ce23d2246b67d07415f1c3a50f9819ea0..a70bd2d45a274f8688564d934bdc910cd4a27f3b 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -353,6 +353,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 } SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); + if (pFillInfo == NULL) { + return NULL; + } + taosResetFillInfo(pFillInfo, skey); pFillInfo->order = order; @@ -370,6 +374,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 pFillInfo->interval.slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); + if (pFillInfo->pData == NULL) { + tfree(pFillInfo); + return NULL; + } // if (numOfTags > 0) { pFillInfo->pTags = calloc(numOfCols, sizeof(SFillTagColInfo));