From 52b28fbceb3b7a9e9a88eb7bc8a40009d27968c1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Nov 2022 19:16:51 +0800 Subject: [PATCH] fix ahandle mem leak --- source/client/src/clientHb.c | 5 ++++- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/transCli.c | 28 ++++++++++++++++++++++++--- source/libs/transport/src/transComm.c | 2 +- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index bf34d3e2df..0f881beb66 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); - tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid); + tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, + rsp->uid); if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); @@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); tFreeClientHbBatchRsp(&pRsp); return -1; } @@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { tFreeClientHbBatchRsp(&pRsp); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); return code; } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f33067f62a..ac54749ae1 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,7 +96,7 @@ typedef void* queue[2]; //#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit //#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) -#define TRANS_CONN_TIMEOUT 3000 // connect timeout (s) +#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 700c5ee1c9..a0df371bb6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -80,6 +80,7 @@ typedef struct SCliThrd { uint64_t nextTimeout; // next timeout void* pTransInst; // + void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; @@ -158,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyCmsg(void* cmsg); +static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); @@ -569,7 +571,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 50) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; - arg->param2 = NULL; + arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); @@ -693,8 +695,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } if (conn->timer != NULL) { uv_timer_stop(conn->timer); - taosArrayPush(pThrd->timerList, &conn->timer); conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; } @@ -1213,6 +1215,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) { taosMemoryFree(pMsg); } +static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { + if (param == NULL) return; + + STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + + tDebug("destroy Ahandle A"); + if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + tDebug("destroy Ahandle B"); + pThrd->destroyAhandleFp(pMsg->ctx->ahandle); + } + tDebug("destroy Ahandle C"); + + transDestroyConnCtx(pMsg->ctx); + destroyUserdata(&pMsg->msg); + taosMemoryFree(pMsg); +} + static SCliThrd* createThrdObj(void* trans) { STrans* pTransInst = trans; @@ -1247,6 +1268,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = trans; + pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; return pThrd; @@ -1262,7 +1284,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); transAsyncPoolDestroy(pThrd->asyncPool); - transDQDestroy(pThrd->delayQueue, destroyCmsg); + transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->timeoutQueue, NULL); tDebug("thread destroy %" PRId64, pThrd->pid); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 18b812f314..7710abcaa1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { SDelayTask* task = container_of(minNode, SDelayTask, node); STaskArg* arg = task->arg; - if (freeFunc) freeFunc(arg->param1); + if (freeFunc) freeFunc(arg); taosMemoryFree(arg); taosMemoryFree(task); -- GitLab