From 293a96d9590f4308075149be2ea146836b32e155 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 27 Jul 2022 21:06:40 +0800 Subject: [PATCH] fix mem leak --- source/libs/index/src/indexFilter.c | 32 ++++++++--- source/libs/transport/src/transCli.c | 83 ++++++++++++++++++++++++---- source/libs/transport/src/transSvr.c | 55 +++++++++++++++++- 3 files changed, 151 insertions(+), 19 deletions(-) diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 7fc41b8dff..70de7ce66e 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -582,8 +582,11 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou // add impl later if (node->condType == LOGIC_COND_TYPE_AND) { taosArrayAddAll(output->result, params[m].result); + taosArrayDestroy(params[m].result); + params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_OR) { taosArrayAddAll(output->result, params[m].result); + params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_NOT) { // taosArrayAddAll(output->result, params[m].result); } @@ -593,6 +596,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou } else { for (int32_t m = 0; m < node->pParameterList->length; m++) { output->status = sifMergeCond(node->condType, output->status, params[m].status); + taosArrayDestroy(params[m].result); + params[m].result = NULL; } } _return: @@ -607,6 +612,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecFunction(node, ctx, &output); if (ctx->code != TSDB_CODE_SUCCESS) { + sifFreeParam(&output); return DEAL_RES_ERROR; } @@ -624,6 +630,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecLogic(node, ctx, &output); if (ctx->code) { + sifFreeParam(&output); return DEAL_RES_ERROR; } @@ -640,6 +647,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecOper(node, ctx, &output); if (ctx->code) { + sifFreeParam(&output); return DEAL_RES_ERROR; } if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { @@ -698,7 +706,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { } nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); - SIF_ERR_RET(ctx.code); + + if (ctx.code != 0) { + sifFreeRes(ctx.pRes); + return ctx.code; + } if (pDst) { SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); @@ -714,8 +726,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } sifFreeRes(ctx.pRes); - - SIF_RET(code); + return code; } static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { @@ -732,8 +743,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { } nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); - - SIF_ERR_RET(ctx.code); + if (ctx.code != 0) { + sifFreeRes(ctx.pRes); + return ctx.code; + } SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); if (res == NULL) { @@ -745,8 +758,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { sifFreeParam(res); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashCleanup(ctx.pRes); - - SIF_RET(code); + return code; } int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) { @@ -760,7 +772,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SArray *output = taosArrayInit(8, sizeof(uint64_t)); SIFParam param = {.arg = *metaArg, .result = output}; - SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); + int32_t code = sifCalculate((SNode *)pFilterNode, ¶m); + if (code != 0) { + sifFreeParam(¶m); + return code; + } taosArrayAddAll(result, param.result); sifFreeParam(¶m); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 064c110a9f..78d4bad8bd 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,5 +1,4 @@ /** Copyright (c) 2019 TAOS Data, Inc. - * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -56,13 +55,14 @@ typedef struct SCliMsg { } SCliMsg; typedef struct SCliThrd { - TdThread thread; // tid - int64_t pid; // pid - uv_loop_t* loop; - SAsyncPool* asyncPool; - uv_idle_t* idle; - uv_timer_t timer; - void* pool; // conn pool + TdThread thread; // tid + int64_t pid; // pid + uv_loop_t* loop; + SAsyncPool* asyncPool; + uv_idle_t* idle; + uv_prepare_t* prepare; + uv_timer_t timer; + void* pool; // conn pool // msg queue queue msg; @@ -118,6 +118,7 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); +static void cliPrepareCb(uv_prepare_t* handle); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); @@ -198,7 +199,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) +#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) { static void cliIdleCb(uv_idle_t* handle) { SCliThrd* thrd = handle->data; tTrace("do idle work"); + + SAsyncPool* pool = thrd->asyncPool; + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg == NULL) { + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, thrd); + count++; + } + } + tTrace("prepare work end"); + if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); +} +static void cliPrepareCb(uv_prepare_t* handle) { + SCliThrd* thrd = handle->data; + tTrace("prepare work start"); + + SAsyncPool* pool = thrd->asyncPool; + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg == NULL) { + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, thrd); + count++; + } + } + tTrace("prepare work end"); + if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } static void* cliWorkThread(void* arg) { @@ -1033,7 +1090,12 @@ static SCliThrd* createThrdObj() { // pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t)); // uv_idle_init(pThrd->loop, pThrd->idle); // pThrd->idle->data = pThrd; - // uv_idle_start(pThrd->idle, cliIdleCb); + // uv_idle_start(pThrd->idle, cliIdleCb); + + pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); + uv_prepare_init(pThrd->loop, pThrd->prepare); + pThrd->prepare->data = pThrd; + uv_prepare_start(pThrd->prepare, cliPrepareCb); pThrd->pool = createConnPool(4); transDQCreate(pThrd->loop, &pThrd->delayQueue); @@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->timeoutQueue, NULL); taosMemoryFree(pThrd->idle); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a97e0b53c1..11b54c575e 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -73,6 +73,7 @@ typedef struct SWorkThrd { uv_os_fd_t fd; uv_loop_t* loop; SAsyncPool* asyncPool; + uv_prepare_t* prepare; queue msg; TdThreadMutex msgMtx; @@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); +static void uvPrepareCb(uv_prepare_t* handle); /* * time-consuming task throwed into BG work thread @@ -546,6 +548,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, uvDestroyConn); taosMemoryFree(req); } +static void uvPrepareCb(uv_prepare_t* handle) { + // prepare callback + SWorkThrd* pThrd = handle->data; + SAsyncPool* pool = pThrd->asyncPool; + + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + while (!QUEUE_IS_EMPTY(&wq)) { + queue* head = QUEUE_HEAD(&wq); + QUEUE_REMOVE(head); + + SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); + if (msg == NULL) { + tError("unexcept occurred, continue"); + continue; + } + // release handle to rpc init + if (msg->type == Quit) { + (*transAsyncHandle[msg->type])(msg, pThrd); + continue; + } else { + STransMsg transMsg = msg->msg; + + SExHandle* exh1 = transMsg.info.handle; + int64_t refId = transMsg.info.refId; + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + if (exh2 == NULL || exh1 != exh2) { + tTrace("handle except msg %p, ignore it", exh1); + transReleaseExHandle(transGetRefMgt(), refId); + destroySmsg(msg); + continue; + } + msg->pConn = exh1->handle; + transReleaseExHandle(transGetRefMgt(), refId); + (*transAsyncHandle[msg->type])(msg, pThrd); + } + } + } +} static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task @@ -695,13 +743,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { } uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - // int r = uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); + pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); + uv_prepare_init(pThrd->loop, pThrd->prepare); + uv_prepare_start(pThrd->prepare, uvPrepareCb); + pThrd->prepare->data = pThrd; + // conn set QUEUE_INIT(&pThrd->conn); @@ -986,6 +1038,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); transAsyncPoolDestroy(pThrd->asyncPool); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } -- GitLab