提交 909529bb 编写于 作者: H Haojun Liao

fix(query): check return value and do some internal refactor.

上级 033b2519
......@@ -122,7 +122,7 @@ typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* res
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef void (*__optr_close_fn_t)(void* param);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef struct STaskIdInfo {
......
......@@ -24,10 +24,9 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "executorInt.h"
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput);
static void destroyLastrowScanOperator(void* param);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
......@@ -211,7 +210,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
}
}
void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
void destroyLastrowScanOperator(void* param) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(param);
......
......@@ -90,13 +90,13 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static void releaseQueryBuf(size_t numOfTables);
static void destroyFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param);
static void destroyOrderOperatorInfo(void* param);
static void destroyAggOperatorInfo(void* param);
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyIntervalOperatorInfo(void* param);
static void destroyExchangeOperatorInfo(void* param);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
......@@ -3424,7 +3424,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
if (pOperator->fpSet.closeFn != NULL) {
pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
pOperator->fpSet.closeFn(pOperator->info);
}
if (pOperator->pDownstream != NULL) {
......@@ -3616,7 +3616,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
return pOperator;
_error:
destroyAggOperatorInfo(pInfo, numOfCols);
destroyAggOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -3641,7 +3641,7 @@ static void freeItem(void* pItem) {
}
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
void destroyAggOperatorInfo(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
......@@ -3651,7 +3651,7 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
void destroyFillOperatorInfo(void* param, int32_t numOfOutput) {
void destroyFillOperatorInfo(void* param) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
......@@ -3667,7 +3667,7 @@ void destroyFillOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
void destroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}
......
......@@ -36,8 +36,12 @@ static void freeGroupKey(void* param) {
taosMemoryFree(pKey->pData);
}
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyGroupOperatorInfo(void* param) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
if (pInfo == NULL) {
return;
}
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
......@@ -413,7 +417,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
}
initResultSizeInfo(&pOperator->resultInfo, 4096);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
......@@ -426,11 +434,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pInfo);
destroyGroupOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
......@@ -710,7 +722,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
return buildPartitionResult(pOperator);
}
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyPartitionOperatorInfo(void* param) {
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pGroupCols);
......
......@@ -25,7 +25,7 @@
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void destroyMergeJoinOperator(void* param);
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode);
......@@ -128,12 +128,11 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->scale = pColumnNode->node.resType.scale;
}
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
void destroyMergeJoinOperator(void* param) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(param);
}
......
......@@ -23,7 +23,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOf
static void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
int32_t numOfExprs);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyProjectOperatorInfo(void* param) {
if (NULL == param) {
return;
}
......@@ -37,10 +37,13 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyIndefinitOperatorInfo(void* param) {
SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
if (pInfo == NULL) {
return;
}
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
......@@ -112,7 +115,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
return pOperator;
_error:
destroyProjectOperatorInfo(pInfo, numOfCols);
destroyProjectOperatorInfo(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
......@@ -371,9 +374,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock;
......@@ -389,7 +395,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -397,7 +403,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
return pOperator;
_error:
taosMemoryFree(pInfo);
destroyIndefinitOperatorInfo(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......
......@@ -689,7 +689,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
return 0;
}
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock);
cleanupQueryTableDataCond(&pTableScanInfo->cond);
......@@ -863,7 +863,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
return pBlock;
}
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
tsdbReaderClose(pDistInfo->pHandle);
......@@ -1532,11 +1532,11 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo
return NULL;
}
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
destroyTableScanOperatorInfo(pTableScanInfo);
taosMemoryFreeClear(pStreamScan->pTableScanOp);
}
if (pStreamScan->tqReader) {
......@@ -1692,7 +1692,7 @@ _error:
return NULL;
}
static void destroySysScanOperator(void* param, int32_t numOfOutput) {
static void destroySysScanOperator(void* param) {
SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
tsem_destroy(&pInfo->ready);
blockDataDestroy(pInfo->pRes);
......@@ -2577,12 +2577,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
}
......@@ -3044,7 +3042,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return pBlock;
}
void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->cond);
taosArrayDestroy(pTableScanInfo->sortSourceParams);
......
......@@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator);
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param);
// todo add limit/offset impl
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
......@@ -250,7 +250,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
}
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
void destroyOrderOperatorInfo(void* param) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
......@@ -468,7 +468,7 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u
return TSDB_CODE_SUCCESS;
}
void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
void destroyGroupSortOperatorInfo(void* param) {
SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
......@@ -685,7 +685,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
return pBlock;
}
void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
void destroyMultiwayMergeOperatorInfo(void* param) {
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
......
......@@ -1664,7 +1664,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyStateWindowOperatorInfo(void* param) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
......@@ -1677,7 +1677,7 @@ static void freeItem(void* param) {
taosMemoryFree(pKey->pData);
}
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
void destroyIntervalOperatorInfo(void* param) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
......@@ -1694,7 +1694,7 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStreamFinalIntervalOperatorInfo(void* param) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
......@@ -1711,7 +1711,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
destroyStreamFinalIntervalOperatorInfo(pChildOp->info);
taosMemoryFree(pChildOp->pDownstream);
cleanupExprSupp(&pChildOp->exprSupp);
taosMemoryFreeClear(pChildOp);
......@@ -1830,6 +1830,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResBlock);
if (isStream) {
......@@ -1849,6 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
}
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
pInfo->delIndex = 0;
......@@ -1878,7 +1883,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
return pOperator;
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
destroyIntervalOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
......@@ -2563,7 +2568,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
return pResBlock->info.rows == 0 ? NULL : pResBlock;
}
void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) {
void destroyTimeSliceOperatorInfo(void* param) {
STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
......@@ -2671,7 +2676,11 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
......@@ -2692,18 +2701,27 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_SUCCESS;
destroyStateWindowOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
void destroySWindowOperatorInfo(void* param) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
if (pInfo == NULL) {
return;
}
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
......@@ -2757,15 +2775,15 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
if (pInfo != NULL) {
destroySWindowOperatorInfo(pInfo, numOfCols);
}
destroySWindowOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
......@@ -3328,14 +3346,16 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResBlock);
ASSERT(numOfCols > 0);
increaseTs(pOperator->exprSupp.pCtx);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL;
if (numOfChild > 0) {
......@@ -3401,7 +3421,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
return pOperator;
_error:
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
destroyStreamFinalIntervalOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
......@@ -3439,7 +3459,7 @@ void destroyStateStreamAggSupporter(SStreamAggSupporter* pSup) {
blockDataDestroy(pSup->pScanBlock);
}
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStreamSessionAggOperatorInfo(void* param) {
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
......@@ -3449,7 +3469,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput);
destroyStreamSessionAggOperatorInfo(pChInfo);
taosMemoryFreeClear(pChild);
}
}
......@@ -3519,7 +3539,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -3583,7 +3603,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
destroyStreamSessionAggOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
......@@ -4411,7 +4431,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs);
destroyStreamSessionAggOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
......@@ -4419,7 +4439,7 @@ _error:
return NULL;
}
void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStreamStateOperatorInfo(void* param) {
SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
destroyStateStreamAggSupporter(&pInfo->streamAggSup);
......@@ -4429,7 +4449,7 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput);
destroyStreamSessionAggOperatorInfo(pChInfo);
taosMemoryFreeClear(pChild);
taosMemoryFreeClear(pChInfo);
}
......@@ -4849,16 +4869,15 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
return pOperator;
_error:
destroyStreamStateOperatorInfo(pInfo, numOfCols);
destroyStreamStateOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
void destroyMergeAlignedIntervalOperatorInfo(void* param) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo, numOfOutput);
destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
taosMemoryFreeClear(param);
}
......@@ -5086,8 +5105,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t code =
initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&iaInfo->binfo, pResBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&iaInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo);
......@@ -5095,10 +5117,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
}
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
......@@ -5122,7 +5140,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
return pOperator;
_error:
destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols);
destroyMergeAlignedIntervalOperatorInfo(miaInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
......@@ -5145,10 +5163,10 @@ typedef struct SGroupTimeWindow {
STimeWindow window;
} SGroupTimeWindow;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
void destroyMergeIntervalOperatorInfo(void* param) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
taosMemoryFreeClear(param);
}
......@@ -5392,8 +5410,11 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&iaInfo->binfo, pResBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&iaInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo);
......@@ -5426,7 +5447,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
return pOperator;
_error:
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
destroyMergeIntervalOperatorInfo(miaInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册