From f4864b87135a7c8e852e30b8e680e32699732582 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jul 2023 01:02:58 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/libs/stream/inc/streamInt.h | 6 ++++++ source/libs/stream/src/streamDispatch.c | 6 ------ source/libs/stream/src/streamTask.c | 10 +++++++++- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index ff3f35bfed..add893c8c7 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -31,6 +31,12 @@ typedef struct { void* timer; } SStreamGlobalEnv; +typedef struct { + SEpSet epset; + int32_t taskId; + SRpcMsg msg; +} SStreamContinueExecInfo; + extern SStreamGlobalEnv streamEnv; void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 88af841f05..ca5d5994b7 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -648,12 +648,6 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory return 0; } -typedef struct { - SEpSet epset; - int32_t taskId; - SRpcMsg msg; -} SStreamContinueExecInfo; - int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { int32_t len = 0; int32_t code = 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d54d5fa8b8..863c4ce025 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include +#include #include "executor.h" #include "tstream.h" #include "wal.h" @@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { return 0; } +static void freeItem(void* p) { + SStreamContinueExecInfo* pInfo = p; + rpcFreeCont(pInfo->msg.pCont); +} + void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s", pTask->id.idStr); @@ -252,7 +259,8 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->pRspMsgList != NULL) { - pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList); + taosArrayDestroyEx(pTask->pRspMsgList, freeItem); + pTask->pRspMsgList = NULL; } taosThreadMutexDestroy(&pTask->lock); -- GitLab