From 909529bb7bf69de94a2d9740c7a6da9677051f4a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Aug 2022 10:55:28 +0800 Subject: [PATCH] fix(query): check return value and do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/cachescanoperator.c | 5 +- source/libs/executor/src/executorimpl.c | 22 ++-- source/libs/executor/src/groupoperator.c | 20 +++- source/libs/executor/src/joinoperator.c | 5 +- source/libs/executor/src/projectoperator.c | 22 ++-- source/libs/executor/src/scanoperator.c | 16 ++- source/libs/executor/src/sortoperator.c | 8 +- source/libs/executor/src/timewindowoperator.c | 103 +++++++++++------- 9 files changed, 119 insertions(+), 84 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fb4eac991f..73f7781c04 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -122,7 +122,7 @@ typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* res typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); -typedef void (*__optr_close_fn_t)(void* param, int32_t num); +typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef struct STaskIdInfo { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 94e4384b30..b31fa279e5 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -24,10 +24,9 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "executorInt.h" static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); -static void destroyLastrowScanOperator(void* param, int32_t numOfOutput); +static void destroyLastrowScanOperator(void* param); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { @@ -211,7 +210,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { } } -void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { +void destroyLastrowScanOperator(void* param) { SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; blockDataDestroy(pInfo->pRes); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6aaa2eb0c7..0b2b7d0220 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -90,13 +90,13 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* static void releaseQueryBuf(size_t numOfTables); -static void destroyFillOperatorInfo(void* param, int32_t numOfOutput); -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); +static void destroyFillOperatorInfo(void* param); +static void destroyProjectOperatorInfo(void* param); +static void destroyOrderOperatorInfo(void* param); +static void destroyAggOperatorInfo(void* param); -static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput); -static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); +static void destroyIntervalOperatorInfo(void* param); +static void destroyExchangeOperatorInfo(void* param); static void destroyOperatorInfo(SOperatorInfo* pOperator); @@ -3424,7 +3424,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs); + pOperator->fpSet.closeFn(pOperator->info); } if (pOperator->pDownstream != NULL) { @@ -3616,7 +3616,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; _error: - destroyAggOperatorInfo(pInfo, numOfCols); + destroyAggOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -3641,7 +3641,7 @@ static void freeItem(void* pItem) { } } -void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { +void destroyAggOperatorInfo(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -3651,7 +3651,7 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroyFillOperatorInfo(void* param, int32_t numOfOutput) { +void destroyFillOperatorInfo(void* param) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); @@ -3667,7 +3667,7 @@ void destroyFillOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { +void destroyExchangeOperatorInfo(void* param) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosRemoveRef(exchangeObjRefPool, pExInfo->self); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ab2326ecae..53709c7dcc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -36,8 +36,12 @@ static void freeGroupKey(void* param) { taosMemoryFree(pKey->pData); } -static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyGroupOperatorInfo(void* param) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; + if (pInfo == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->keyBuf); taosArrayDestroy(pInfo->pGroupCols); @@ -413,7 +417,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx } initResultSizeInfo(&pOperator->resultInfo, 4096); - initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -426,11 +434,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFreeClear(pInfo); + destroyGroupOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); return NULL; } @@ -710,7 +722,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { return buildPartitionResult(pOperator); } -static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyPartitionOperatorInfo(void* param) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosArrayDestroy(pInfo->pGroupCols); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7d2b84d0f0..1bc7d458e0 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -25,7 +25,7 @@ static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); -static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); +static void destroyMergeJoinOperator(void* param); static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode); @@ -128,12 +128,11 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } -void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { +void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index b0ca219d52..0661ccd390 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -23,7 +23,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOf static void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs); -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyProjectOperatorInfo(void* param) { if (NULL == param) { return; } @@ -37,10 +37,13 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyIndefinitOperatorInfo(void* param) { SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); + if (pInfo == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); taosArrayDestroy(pInfo->pPseudoColInfo); cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSup); @@ -112,7 +115,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys return pOperator; _error: - destroyProjectOperatorInfo(pInfo, numOfCols); + destroyProjectOperatorInfo(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = code; return NULL; @@ -371,9 +374,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy initResultSizeInfo(&pOperator->resultInfo, numOfRows); - initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&pInfo->binfo, pResBlock); + int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); pInfo->binfo.pRes = pResBlock; @@ -389,7 +395,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, destroyIndefinitOperatorInfo, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -397,7 +403,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy return pOperator; _error: - taosMemoryFree(pInfo); + destroyIndefinitOperatorInfo(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ec902588e3..c9b8d5a377 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -689,7 +689,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr return 0; } -static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); cleanupQueryTableDataCond(&pTableScanInfo->cond); @@ -863,7 +863,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { return pBlock; } -static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); tsdbReaderClose(pDistInfo->pHandle); @@ -1532,11 +1532,11 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo return NULL; } -static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyStreamScanOperatorInfo(void* param) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; - destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput); + destroyTableScanOperatorInfo(pTableScanInfo); taosMemoryFreeClear(pStreamScan->pTableScanOp); } if (pStreamScan->tqReader) { @@ -1692,7 +1692,7 @@ _error: return NULL; } -static void destroySysScanOperator(void* param, int32_t numOfOutput) { +static void destroySysScanOperator(void* param) { SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); @@ -2577,12 +2577,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); - taosArrayDestroy(pInfo->pColMatchInfo); - taosMemoryFreeClear(param); } @@ -3044,7 +3042,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return pBlock; } -void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); taosArrayDestroy(pTableScanInfo->sortSourceParams); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index dbaba98914..e2014ec973 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator); static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); +static void destroyOrderOperatorInfo(void* param); // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { @@ -250,7 +250,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; } -void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { +void destroyOrderOperatorInfo(void* param) { SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); @@ -468,7 +468,7 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u return TSDB_CODE_SUCCESS; } -void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { +void destroyGroupSortOperatorInfo(void* param) { SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); @@ -685,7 +685,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { return pBlock; } -void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) { +void destroyMultiwayMergeOperatorInfo(void* param) { SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3769c57bf3..1ef191679e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1664,7 +1664,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } -static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); @@ -1677,7 +1677,7 @@ static void freeItem(void* param) { taosMemoryFree(pKey->pData); } -void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyIntervalOperatorInfo(void* param) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); @@ -1694,7 +1694,7 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyStreamFinalIntervalOperatorInfo(void* param) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); @@ -1711,7 +1711,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); + destroyStreamFinalIntervalOperatorInfo(pChildOp->info); taosMemoryFree(pChildOp->pDownstream); cleanupExprSupp(&pChildOp->exprSupp); taosMemoryFreeClear(pChildOp); @@ -1830,6 +1830,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); if (isStream) { @@ -1849,6 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } } + pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->delIndex = 0; @@ -1878,7 +1883,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; _error: - destroyIntervalOperatorInfo(pInfo, numOfCols); + destroyIntervalOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -2563,7 +2568,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { return pResBlock->info.rows == 0 ? NULL : pResBlock; } -void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTimeSliceOperatorInfo(void* param) { STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); @@ -2671,7 +2676,11 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -2692,18 +2701,27 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: - pTaskInfo->code = TSDB_CODE_SUCCESS; + destroyStateWindowOperatorInfo(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; return NULL; } -void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { +void destroySWindowOperatorInfo(void* param) { SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); + if (pInfo == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); @@ -2757,15 +2775,15 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); pOperator->pTaskInfo = pTaskInfo; - code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: - if (pInfo != NULL) { - destroySWindowOperatorInfo(pInfo, numOfCols); - } - + destroySWindowOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3328,14 +3346,16 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); ASSERT(numOfCols > 0); increaseTs(pOperator->exprSupp.pCtx); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->pChildren = NULL; if (numOfChild > 0) { @@ -3401,7 +3421,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, return pOperator; _error: - destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); + destroyStreamFinalIntervalOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3439,7 +3459,7 @@ void destroyStateStreamAggSupporter(SStreamAggSupporter* pSup) { blockDataDestroy(pSup->pScanBlock); } -void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { +void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); @@ -3449,7 +3469,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + destroyStreamSessionAggOperatorInfo(pChInfo); taosMemoryFreeClear(pChild); } } @@ -3519,7 +3539,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3583,7 +3603,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh _error: if (pInfo != NULL) { - destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); + destroyStreamSessionAggOperatorInfo(pInfo); } taosMemoryFreeClear(pOperator); @@ -4411,7 +4431,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream _error: if (pInfo != NULL) { - destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs); + destroyStreamSessionAggOperatorInfo(pInfo); } taosMemoryFreeClear(pOperator); @@ -4419,7 +4439,7 @@ _error: return NULL; } -void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { +void destroyStreamStateOperatorInfo(void* param) { SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStateStreamAggSupporter(&pInfo->streamAggSup); @@ -4429,7 +4449,7 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + destroyStreamSessionAggOperatorInfo(pChInfo); taosMemoryFreeClear(pChild); taosMemoryFreeClear(pChInfo); } @@ -4849,16 +4869,15 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys return pOperator; _error: - destroyStreamStateOperatorInfo(pInfo, numOfCols); + destroyStreamStateOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; } -void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyMergeAlignedIntervalOperatorInfo(void* param) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param; - destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo, numOfOutput); - + destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); } @@ -5086,8 +5105,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&iaInfo->binfo, pResBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); @@ -5095,10 +5117,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); } - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - initResultRowInfo(&iaInfo->binfo.resultRowInfo); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -5122,7 +5140,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, return pOperator; _error: - destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); + destroyMergeAlignedIntervalOperatorInfo(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5145,10 +5163,10 @@ typedef struct SGroupTimeWindow { STimeWindow window; } SGroupTimeWindow; -void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyMergeIntervalOperatorInfo(void* param) { SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; tdListFree(miaInfo->groupIntervals); - destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); + destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); } @@ -5392,8 +5410,11 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&iaInfo->binfo, pResBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); @@ -5426,7 +5447,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI return pOperator; _error: - destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + destroyMergeIntervalOperatorInfo(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; -- GitLab