From e35d145734406d1fb624b59f8cdc84816dbf434c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 13:59:13 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 12 ++---------- source/dnode/vnode/src/tq/tqUtil.c | 2 +- source/libs/executor/src/scanoperator.c | 1 - source/libs/stream/src/streamData.c | 8 +------- 5 files changed, 4 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5fd9a8b12b..0d021e2fa2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -135,7 +135,6 @@ typedef struct { typedef struct { int8_t type; int64_t ver; - int32_t* dataRef; SSDataBlock* pBlock; } SStreamRefDataBlock; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81d64fb98a..81bb5034f1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -947,6 +947,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; + *pRefBlock = NULL; + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); if (pRes->uidList == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -984,21 +986,13 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream } taosArrayDestroy(pRes->uidList); - - int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); - *pRef = 1; - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); if (pRefBlock == NULL) { - taosMemoryFree(pRef); return TSDB_CODE_OUT_OF_MEMORY; } (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; (*pRefBlock)->pBlock = pDelBlock; - (*pRefBlock)->dataRef = pRef; - atomic_add_fetch_32((*pRefBlock)->dataRef, 1); - return TSDB_CODE_SUCCESS; } @@ -1069,8 +1063,6 @@ int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->pBlock = pDelBlock; - pRefBlock->dataRef = pRef; - atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { atomic_sub_fetch_32(pRef, 1); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 27e817c50f..c74a25eba1 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -29,7 +29,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next ver:%" PRId64, pTask->id.idStr, /*pPackedData->ver*/ 0L); + tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); return -1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7cb3c00c1a..5abab30e7c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1912,7 +1912,6 @@ FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); - /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3..96022850a3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; - - int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - blockDataDestroy(pRefBlock->pBlock); - taosMemoryFree(pRefBlock->dataRef); - } + blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); } } -- GitLab