From 004a84ac81afcd945a87faf10a191238d040a22e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 2 Sep 2022 16:41:32 +0800 Subject: [PATCH] fix memory leak --- source/dnode/vnode/src/tq/tq.c | 5 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 3 +- source/libs/executor/src/executor.c | 6 +- source/libs/function/src/builtinsimpl.c | 74 ++++++++++++------------- 4 files changed, 46 insertions(+), 42 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f8fac97710..f9c2757c37 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -829,8 +829,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { tDecoderInit(pCoder, pReq, len); tDecodeDeleteRes(pCoder, pRes); - /*ASSERT(pRes->skey != 0);*/ - /*ASSERT(pRes->ekey != 0);*/ tDecoderClear(pCoder); int32_t sz = taosArrayGetSize(pRes->uidList); @@ -859,6 +857,8 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); } + taosArrayDestroy(pRes->uidList); + void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); @@ -890,6 +890,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { streamTaskInputFail(pTask); } } + blockDataDestroy(pDelBlock); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 96e8e44bc9..2724d4cbfd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1107,8 +1107,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq tDecoderInit(pCoder, pReq, len); tDecodeDeleteRes(pCoder, pRes); - ASSERT(pRes->skey != 0); - ASSERT(pRes->ekey != 0); + ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0)); for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid), diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a3e0b2b733..037bf543b1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -52,7 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // TODO: if a block was set but not consumed, // prevent setting a different type of block pInfo->validBlockIndex = 0; - taosArrayClear(pInfo->pBlockLists); + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); + } else { + taosArrayClear(pInfo->pBlockLists); + } if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7160541c13..dcf58759b0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -76,11 +76,11 @@ typedef struct STopBotResItem { } STopBotResItem; typedef struct STopBotRes { - int32_t maxSize; - int16_t type; + int32_t maxSize; + int16_t type; - STuplePos nullTuplePos; - bool nullTupleSaved; + STuplePos nullTuplePos; + bool nullTupleSaved; STopBotResItem* pItems; } STopBotRes; @@ -223,14 +223,14 @@ typedef struct SMavgInfo { } SMavgInfo; typedef struct SSampleInfo { - int32_t samples; - int32_t totalPoints; - int32_t numSampled; - uint8_t colType; - int16_t colBytes; + int32_t samples; + int32_t totalPoints; + int32_t numSampled; + uint8_t colType; + int16_t colBytes; - STuplePos nullTuplePos; - bool nullTupleSaved; + STuplePos nullTuplePos; + bool nullTupleSaved; char* data; STuplePos* tuplePos; @@ -1147,7 +1147,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock); -static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { @@ -1357,8 +1357,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { numOfElems += 1; } - } else if (type == TSDB_DATA_TYPE_BIGINT || - type == TSDB_DATA_TYPE_TIMESTAMP) { + } else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) { int64_t* pData = (int64_t*)pCol->pData; int64_t* val = (int64_t*)&pBuf->v; @@ -1581,7 +1580,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } _min_max_over: - if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) { + if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pBuf->nullTupleSaved = true; } @@ -1601,7 +1600,8 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { } static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); -static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex); +static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, + int32_t rowIndex); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -1651,7 +1651,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple if (pCtx->saveHandle.pBuf != NULL) { if (pTuplePos->pageId != -1) { - int32_t numOfCols = pCtx->subsidiaries.num; + int32_t numOfCols = pCtx->subsidiaries.num; const char* p = loadTupleData(pCtx, pTuplePos); bool* nullList = (bool*)p; @@ -1660,7 +1660,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple // todo set the offset value to optimize the performance. for (int32_t j = 0; j < numOfCols; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); @@ -1701,7 +1701,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) char* pData = colDataGetData(pSrcCol, rowIndex); // append to dest col - int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId); ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); @@ -1712,7 +1712,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) colDataAppend(pDstCol, pos, pData, false); } } - } void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { @@ -2590,8 +2589,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo)); - qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, - pHisto->numOfEntries, pHisto); + qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries, + pHisto); } else { pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo)); qDebug("%s input histogram, elem:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, @@ -2601,8 +2600,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN); pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo)); - qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, - pHisto->numOfEntries, pHisto); + qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries, + pHisto); tHistogramDestroy(&pRes); } } @@ -2629,8 +2628,8 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) { } if (pInfo->algo != APERCT_ALGO_TDIGEST) { - qDebug("%s after merge, total:%d, numOfEntry:%d, %p", __FUNCTION__, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, - pInfo->pHisto); + qDebug("%s after merge, total:%d, numOfEntry:%d, %p", __FUNCTION__, pInfo->pHisto->numOfElems, + pInfo->pHisto->numOfEntries, pInfo->pHisto); } SET_VAL(pResInfo, 1, 1); @@ -2709,7 +2708,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) } EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { - SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes; + SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes; // not initialized yet, data is required if (pEntry == NULL) { @@ -2752,7 +2751,8 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex); } -static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { +static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, + SFirstLastRes* pInfo) { if (pCtx->subsidiaries.num <= 0) { return; } @@ -3176,7 +3176,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { switch (type) { case TSDB_DATA_TYPE_BOOL: - pDiffInfo->prev.i64 = *(bool*)pv? 1:0; + pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0; break; case TSDB_DATA_TYPE_TINYINT: pDiffInfo->prev.i64 = *(int8_t*)pv; @@ -3537,7 +3537,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData * |(n columns, one bit for each column)| src column #1| src column #2| * +------------------------------------+--------------+--------------+ */ -void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, char* buf) { +void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, + char* buf) { char* nullList = buf; char* pStart = (char*)(nullList + sizeof(bool) * pSubsidiaryies->num); @@ -3585,7 +3586,7 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf } } - p = (STuplePos) {.pageId = pHandle->currentPage, .offset = pPage->num}; + p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num}; memcpy(pPage->data + pPage->num, pBuf, length); pPage->num += length; @@ -3621,7 +3622,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); } else { - } return TSDB_CODE_SUCCESS; @@ -3636,7 +3636,7 @@ static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSD static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) { if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); - char* p = pPage->data + pPos->offset; + char* p = pPage->data + pPos->offset; releaseBufPage(pHandle->pBuf, pPage); return p; } else { @@ -3980,8 +3980,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { } if (pCtx->end.key == INT64_MIN) { - pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? - ptsList[start + pInput->numOfRows - 1] : pInfo->min; + pInfo->min = + (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->min; } else { pInfo->min = pCtx->end.key; } @@ -3993,8 +3993,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { } if (pCtx->end.key == INT64_MIN) { - pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? - ptsList[start + pInput->numOfRows - 1] : pInfo->max; + pInfo->max = + (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { pInfo->max = pCtx->end.key + 1; } -- GitLab