From 6973e3555474715d09407bcfb29443499e2277a8 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 15:13:44 +0800 Subject: [PATCH] fix mem leak --- source/libs/qworker/inc/qworkerInt.h | 9 ++++--- source/libs/qworker/src/qworker.c | 39 +++++++++++++++++++++------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 4f2f3febaf..a2b1353093 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -87,6 +87,7 @@ typedef struct SQWMsg { } SQWMsg; typedef struct SQWHbParam { + bool inUse; int32_t qwrId; int64_t refId; } SQWHbParam; @@ -158,9 +159,11 @@ typedef struct SQWorker { } SQWorker; typedef struct SQWorkerMgmt { - SRWLatch lock; - int32_t qwRef; - int32_t qwNum; + SRWLatch lock; + int32_t qwRef; + int32_t qwNum; + SQWHbParam param[1024]; + int32_t paramIdx; } SQWorkerMgmt; #define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c94dd29ea1..e84f387dbe 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1340,7 +1340,6 @@ _return: void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWHbParam* hbParam = (SQWHbParam*)param; if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { - taosMemoryFree(param); return; } @@ -1463,6 +1462,28 @@ int32_t qwOpenRef(void) { return TSDB_CODE_SUCCESS; } +void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { + int32_t paramIdx = 0; + int32_t newParamIdx = 0; + + while (true) { + paramIdx = atomic_load_32(&gQwMgmt.paramIdx); + if (paramIdx == tListLen(gQwMgmt.param)) { + newParamIdx = 0; + } else { + newParamIdx = paramIdx + 1; + } + + if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) { + break; + } + } + + gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef; + gQwMgmt.param[paramIdx].refId = refId; + + *pParam = &gQwMgmt.param[paramIdx]; +} int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { @@ -1470,7 +1491,10 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - atomic_add_fetch_32(&gQwMgmt.qwNum, 1); + int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); + if (1 == qwNum) { + memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); + } int32_t code = qwOpenRef(); if (code) { @@ -1533,14 +1557,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_ERR_JRET(terrno); } - SQWHbParam *param = taosMemoryMalloc(sizeof(SQWHbParam)); - if (NULL == param) { - qError("malloc hb param failed, error:%s", tstrerror(terrno)); - QW_ERR_JRET(terrno); - } - param->qwrId = gQwMgmt.qwRef; - param->refId = mgmt->refId; - + SQWHbParam *param = NULL; + qwSetHbParam(mgmt->refId, ¶m); + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); if (NULL == mgmt->hbTimer) { qError("start hb timer failed"); -- GitLab