提交 6973e355 编写于 作者: D dapan1121

fix mem leak

上级 dd13924d
...@@ -87,6 +87,7 @@ typedef struct SQWMsg { ...@@ -87,6 +87,7 @@ typedef struct SQWMsg {
} SQWMsg; } SQWMsg;
typedef struct SQWHbParam { typedef struct SQWHbParam {
bool inUse;
int32_t qwrId; int32_t qwrId;
int64_t refId; int64_t refId;
} SQWHbParam; } SQWHbParam;
...@@ -161,6 +162,8 @@ typedef struct SQWorkerMgmt { ...@@ -161,6 +162,8 @@ typedef struct SQWorkerMgmt {
SRWLatch lock; SRWLatch lock;
int32_t qwRef; int32_t qwRef;
int32_t qwNum; int32_t qwNum;
SQWHbParam param[1024];
int32_t paramIdx;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId #define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
......
...@@ -1340,7 +1340,6 @@ _return: ...@@ -1340,7 +1340,6 @@ _return:
void qwProcessHbTimerEvent(void *param, void *tmrId) { void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWHbParam* hbParam = (SQWHbParam*)param; SQWHbParam* hbParam = (SQWHbParam*)param;
if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
taosMemoryFree(param);
return; return;
} }
...@@ -1463,6 +1462,28 @@ int32_t qwOpenRef(void) { ...@@ -1463,6 +1462,28 @@ int32_t qwOpenRef(void) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
int32_t paramIdx = 0;
int32_t newParamIdx = 0;
while (true) {
paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
if (paramIdx == tListLen(gQwMgmt.param)) {
newParamIdx = 0;
} else {
newParamIdx = paramIdx + 1;
}
if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
break;
}
}
gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
gQwMgmt.param[paramIdx].refId = refId;
*pParam = &gQwMgmt.param[paramIdx];
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
...@@ -1470,7 +1491,10 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW ...@@ -1470,7 +1491,10 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
atomic_add_fetch_32(&gQwMgmt.qwNum, 1); int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
if (1 == qwNum) {
memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
}
int32_t code = qwOpenRef(); int32_t code = qwOpenRef();
if (code) { if (code) {
...@@ -1533,13 +1557,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW ...@@ -1533,13 +1557,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_ERR_JRET(terrno); QW_ERR_JRET(terrno);
} }
SQWHbParam *param = taosMemoryMalloc(sizeof(SQWHbParam)); SQWHbParam *param = NULL;
if (NULL == param) { qwSetHbParam(mgmt->refId, &param);
qError("malloc hb param failed, error:%s", tstrerror(terrno));
QW_ERR_JRET(terrno);
}
param->qwrId = gQwMgmt.qwRef;
param->refId = mgmt->refId;
mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer);
if (NULL == mgmt->hbTimer) { if (NULL == mgmt->hbTimer) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册