diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fc489f08af05b02937bfb9f3a4f11b127fd62e47..b3c5565cfb63bde7c5b659d929e6d634e693f7a8 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -438,21 +438,18 @@ int taos_init() { } int taos_options_imp(TSDB_OPTION option, const char *str) { - if (option != TSDB_OPTION_CONFIGDIR) { - taos_init(); // initialize global config - } else { + if (option == TSDB_OPTION_CONFIGDIR) { tstrncpy(configDir, str, PATH_MAX); tscInfo("set cfg:%s to %s", configDir, str); return 0; + } else { + taos_init(); // initialize global config } SConfig *pCfg = taosGetCfg(); SConfigItem *pItem = NULL; switch (option) { - case TSDB_OPTION_CONFIGDIR: - pItem = cfgGetItem(pCfg, "configDir"); - break; case TSDB_OPTION_SHELL_ACTIVITY_TIMER: pItem = cfgGetItem(pCfg, "shellActivityTimer"); break; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b494140da59e9ac410c1c6bb649894b30edb6d2d..df717bda23abb8a887afd80e9cfe03a9e7a7d121 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1130,7 +1130,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM break; } - if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { + // TODO weired responding code? + if (TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1da5297396254fb8708a1f1d3c4ee80d14e7b4f0..3f88d478e292df6f092a774416f8c8865ac015e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -645,11 +645,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; double el = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug( - "load block of %"PRIzu" tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed " - "time:%.2f ms %s", - numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el, - pReader->idStr); + tsdbDebug("load block of %" PRIzu + " tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed " + "time:%.2f ms %s", + numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el, + pReader->idStr); pReader->cost.numOfBlocks += total; pReader->cost.headFileLoadTime += el; @@ -1579,7 +1579,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; - tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } @@ -1589,7 +1592,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow1); } else { init = true; - tRowMergerInit(&merge, &fRow1, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); } @@ -1600,16 +1606,29 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - tRowMergerInit(&merge, pRow, pSchema); + int32_t code = tRowMergerInit(&merge, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } } else { if (minKey == k.ts) { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - tRowMergerInit(&merge, pRow, pSchema); - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + int32_t code = tRowMergerInit(&merge, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (minKey == tsLast) { @@ -1618,7 +1637,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow1); } else { init = true; - tRowMergerInit(&merge, &fRow1, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); } @@ -1628,7 +1650,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow); } else { init = true; - tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } @@ -1662,13 +1687,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) { return TSDB_CODE_SUCCESS; } else { - tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tRowMerge(&merge, &fRow1); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); - int32_t code = tRowMergerGetRow(&merge, &pTSRow); + code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1679,7 +1707,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, tRowMergerClear(&merge); } } else { // not merge block data - tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); ASSERT(mergeBlockData); @@ -1688,7 +1720,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - int32_t code = tRowMergerGetRow(&merge, &pTSRow); + code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1724,7 +1756,11 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader STSRow* pTSRow = NULL; SRowMerger merge = {0}; - tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); @@ -1732,7 +1768,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - int32_t code = tRowMergerGetRow(&merge, &pTSRow); + code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1756,9 +1792,9 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { - SRowMerger merge = {0}; - STSRow* pTSRow = NULL; - + SRowMerger merge = {0}; + STSRow* pTSRow = NULL; + int32_t code = TSDB_CODE_SUCCESS; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; @@ -1850,12 +1886,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, piRow, pSchema); + code = tRowMergerInit(&merge, piRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, + &merge, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (minKey == k.ts) { @@ -1863,23 +1904,31 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, pRow); } else { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, pRow, pSchema); + code = tRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, + pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } else { if (minKey == k.ts) { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, pRow, pSchema); + code = tRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, + pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (minKey == ik.ts) { @@ -1888,12 +1937,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, piRow, pSchema); + code = tRowMergerInit(&merge, piRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, + pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (minKey == tsLast) { @@ -1924,7 +1977,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - int32_t code = tRowMergerGetRow(&merge, &pTSRow); + code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2085,9 +2138,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc STSRow* pTSRow = NULL; SRowMerger merge = {0}; - tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - int32_t code = tRowMergerGetRow(&merge, &pTSRow); + code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2925,6 +2982,10 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe } STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid); + if (pTSchema == NULL) { + return terrno; + } + tRowMergerAdd(pMerger, pRow, pTSchema); } @@ -3094,13 +3155,17 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, } STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid); - if(pTSchema1 == NULL) { + if (pTSchema1 == NULL) { return terrno; } tRowMergerAdd(&merge, pNextRow, pTSchema1); - doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); + code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = tRowMergerGetRow(&merge, pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3121,19 +3186,44 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - tRowMergerInit(&merge, piRow, pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + int32_t code = tRowMergerInit(&merge, piRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, + pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } tRowMerge(&merge, pRow); - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - tRowMergerInit(&merge, pRow, pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + int32_t code = tRowMergerInit(&merge, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, + pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } int32_t code = tRowMergerGetRow(&merge, pTSRow); @@ -3457,18 +3547,18 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl STsdbReader* pPrevReader = pReader->innerReader[0]; STsdbReader* pNextReader = pReader->innerReader[1]; - // we need only one row - pPrevReader->capacity = 1; - pPrevReader->status.pTableMap = pReader->status.pTableMap; - pPrevReader->pSchema = pReader->pSchema; - pPrevReader->pMemSchema = pReader->pMemSchema; - pPrevReader->pReadSnap = pReader->pReadSnap; - - pNextReader->capacity = 1; - pNextReader->status.pTableMap = pReader->status.pTableMap; - pNextReader->pSchema = pReader->pSchema; - pNextReader->pMemSchema = pReader->pMemSchema; - pNextReader->pReadSnap = pReader->pReadSnap; + // we need only one row + pPrevReader->capacity = 1; + pPrevReader->status.pTableMap = pReader->status.pTableMap; + pPrevReader->pSchema = pReader->pSchema; + pPrevReader->pMemSchema = pReader->pMemSchema; + pPrevReader->pReadSnap = pReader->pReadSnap; + + pNextReader->capacity = 1; + pNextReader->status.pTableMap = pReader->status.pTableMap; + pNextReader->pSchema = pReader->pSchema; + pNextReader->pMemSchema = pReader->pMemSchema; + pNextReader->pReadSnap = pReader->pReadSnap; code = doOpenReaderImpl(pPrevReader); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f99c83238198f23586707b8e09ddb5ff4f2da1f4..79f6166079b4701e2399203cfd6bdd0f2d73b444 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -122,7 +122,8 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) { __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc; - taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, fn); + int32_t size = POINTER_BYTES; + taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn); } pGroupResInfo->index = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0f4118257b4ec16f96c26d2c3bfd6a344d183d68..0d861c1fb94a93ad9524993ea582f558b941e19e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -276,6 +276,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, &keyInfo.groupId); if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(keyBuf); return code; } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index aaef411d2dfef41d6e1dc2c62280cafe18726d9d..a7e98527f4763ccc23dd2f1b58362cd036286d2a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -97,11 +97,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; + ASSERT(pOperator->pTaskInfo != NULL); pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0; - if (pOperator->pTaskInfo != NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - } + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { @@ -1092,13 +1091,13 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM } SFilterInfo* filter = pFilterInfo; - int64_t st = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); -// pError("start filter"); + // pError("start filter"); // todo move to the initialization function int32_t code = 0; - bool needFree = false; + bool needFree = false; if (filter == NULL) { needFree = true; code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); @@ -1848,6 +1847,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { taosMemoryFreeClear(pMsg); + taosMemoryFree(pWrapper); qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return pTaskInfo->code; @@ -3094,8 +3094,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* } return pOperator; + _error: - destroyAggOperatorInfo(pInfo); + if (pInfo != NULL) { + destroyAggOperatorInfo(pInfo); + } + taosMemoryFreeClear(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -3242,13 +3246,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); + pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); + pOperator->exprSupp.pExprInfo = pExprInfo; + pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); int32_t code = createWStartTsAsNotFillExpr(pInfo, pPhyFillNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } + SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval @@ -3258,16 +3265,20 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; + initResultSizeInfo(&pOperator->resultInfo, 4096); - blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); - initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId; pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, - &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, + &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, @@ -3276,17 +3287,14 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pInfo->pRes = pResBlock; - pInfo->pFinalRes = createOneDataBlock(pResBlock, false); + pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); pInfo->pCondition = pPhyFillNode->node.pConditions; - pInfo->pColMatchColInfo = pColMatchColInfo; pOperator->name = "FillOperator"; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -3298,8 +3306,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* return pOperator; _error: + if (pInfo != NULL) { + destroyFillOperatorInfo(pInfo); + } + taosMemoryFreeClear(pOperator); - taosMemoryFreeClear(pInfo); return NULL; } @@ -3388,7 +3399,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { pSchema->colId = pColNode->colId; pSchema->type = pColNode->node.resType.type; pSchema->bytes = pColNode->node.resType.bytes; - strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); + tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); } } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index fe6ee4129a9a0d0bd6dec2eb3e2fddc5452880da..ddb6d73aab2671cf8deb7ccbc867137b3a7fbd9f 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -384,6 +384,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy numOfRows = TWOMB / pResBlock->info.rowSize; } + initBasicInfo(&pInfo->binfo, pResBlock); initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); @@ -391,7 +392,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy goto _error; } - initBasicInfo(&pInfo->binfo, pResBlock); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); pInfo->binfo.pRes = pResBlock; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c6bd3532efcd6b802934eb1d296009453527e4b7..8e1858d8c0d33bd764c053f6eff21cc17bd0df4e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1770,6 +1770,7 @@ FETCH_NEXT_BLOCK: pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; printDataBlock(pDelBlock, "stream scan delete result"); if (pInfo->pDeleteDataRes->info.rows > 0) { + blockDataDestroy(pDelBlock); return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; @@ -3001,7 +3002,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { while (1) { int64_t startTs = taosGetTimestampUs(); tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); - strcpy(pInfo->req.user, pInfo->pUser); + tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user)); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); @@ -3332,6 +3333,11 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags const char* idStr) { int64_t st = taosGetTimestampUs(); + if (pHandle == NULL) { + qError("invalid handle, in creating operator tree", idStr); + return TSDB_CODE_INVALID_PARA; + } + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { qError("failed to getTableList, code: %s", tstrerror(code)); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0d5f52103420dfe874d6e20514e1891db55a2362..7ca3de5214cd18202b5dfe2293bcc495e6b08190 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -398,15 +398,16 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) { SGroupSortOperatorInfo* pInfo = pOperator->info; SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle); + pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; pInfo->sortExecInfo.loops += sortExecInfo.loops; pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - if (pInfo->pCurrSortHandle != NULL) { - tsortDestroySortHandle(pInfo->pCurrSortHandle); - } + + tsortDestroySortHandle(pInfo->pCurrSortHandle); pInfo->pCurrSortHandle = NULL; + return TSDB_CODE_SUCCESS; } @@ -717,10 +718,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - - int32_t rowSize = pResBlock->info.rowSize; + pInfo->binfo.pRes = createResDataBlock(pDescNode); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { goto _error; } @@ -734,7 +734,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; - pInfo->binfo.pRes = pResBlock; pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pInputBlock = pInputBlock; @@ -761,7 +760,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); + if (pInfo != NULL) { + destroyMultiwayMergeOperatorInfo(pInfo); + } + taosMemoryFree(pOperator); return NULL; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cbbd874bb911c2df82f54cc6eecc7e2956d6e362..199306ed3617962fbb3aebb6915059795dfac622 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1480,7 +1480,7 @@ static void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, STimeWindowAgg for (int32_t i = 0; i < pBlock->info.rows; i++) { TSKEY startTs = TMAX(tsStarts[i], pTwSup->minTs); TSKEY endTs = TMIN(tsEnds[i], pTwSup->maxTs); - SResultRowInfo dumyInfo; + SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTs, pInterval, TSDB_ORDER_ASC); do { @@ -1506,8 +1506,9 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { - SResultRowInfo dumyInfo; + SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); while (win.ekey <= endTsCols[i]) { uint64_t winGpId = pGpDatas[i]; @@ -2748,11 +2749,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; - SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; if (pStateNode->window.pExprs != NULL) { @@ -2775,6 +2772,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -3618,14 +3619,13 @@ void destroyStreamSessionAggOperatorInfo(void* param) { int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { + initBasicInfo(pBasicInfo, pResultBlock); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); if (code != TSDB_CODE_SUCCESS) { return code; } - initStreamFunciton(pSup->pCtx, pSup->numOfExprs); - - initBasicInfo(pBasicInfo, pResultBlock); + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); for (int32_t i = 0; i < numOfCols; ++i) { pSup->pCtx[i].saveHandle.pBuf = NULL; } @@ -3665,9 +3665,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SExecTaskInfo* pTaskInfo) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t code = TSDB_CODE_OUT_OF_MEMORY; + SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3685,8 +3684,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } } + SExprSupp* pSup = &pOperator->exprSupp; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3701,8 +3703,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (pInfo->pDummyCtx == NULL) { goto _error; } - initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols); + initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols); pInfo->twAggSup = (STimeWindowAggSupp){ .waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType, @@ -5004,14 +5006,14 @@ int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFu SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; - int32_t code = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = TSDB_CODE_SUCCESS; SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } @@ -5025,7 +5027,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -5040,6 +5042,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5615,7 +5618,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc); SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, @@ -5644,6 +5646,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge goto _error; } + SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pIntervalInfo->binfo, pResBlock); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); @@ -5675,7 +5678,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge return pOperator; _error: - destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); + if (pMergeIntervalInfo != NULL) { + destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); + } + taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5827,8 +5833,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->isFinal = false; - - pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); SExprSupp* pSup = &pOperator->exprSupp; @@ -5837,8 +5841,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys initStreamFunciton(pSup->pCtx, pSup->numOfExprs); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 824e988c2f8ac388ff59576eb424697a57eed3b8..b133041fdce88bcb63338a6ba1b7f03628d9f158 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -220,7 +220,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket; pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t)); - if (pBucket->pPageIdList == NULL || pBucket == NULL) { + if (pBucket->pPageIdList == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -256,6 +256,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, "", tsTempDir); if (code != 0) { + taosMemoryFree(pHashObj); terrno = code; return NULL; } @@ -351,7 +352,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data char* pStart = p->data; while (pStart - ((char*)p) < p->num) { SLHashNode* pNode = (SLHashNode*)pStart; - ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0); + ASSERT(pNode->keyLen > 0); char* k = GET_LHASH_NODE_KEY(pNode); int32_t hashv = pHashObj->hashFn(k, pNode->keyLen); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 74da5c45ee1f56b552c3f95f8bfbffd95994606a..85582cbd39d1083b08482a2de1146cf5d91391a9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -548,7 +548,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pResList); return code; } }