提交 d8807d3d 编写于 作者: H Haojun Liao

fix(tmq): fix memory leak.

上级 1f71cf3a
...@@ -830,7 +830,6 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -830,7 +830,6 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
...@@ -863,8 +862,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -863,8 +862,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// TODO version should be assigned and refed during preprocess // TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) { if (pRef == NULL) {
taosMemoryFree(req.qmsg);
return -1; return -1;
} }
int64_t ver = pRef->refVer; int64_t ver = pRef->refVer;
pHandle->pRef = pRef; pHandle->pRef = pRef;
...@@ -921,6 +922,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -921,6 +922,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
pHandle->consumerId, oldConsumerId); pHandle->consumerId, oldConsumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg);
return -1; return -1;
} }
} else { } else {
...@@ -928,6 +930,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -928,6 +930,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, -1); atomic_store_32(&pHandle->epoch, -1);
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
return tqMetaSaveHandle(pTq, req.subKey, pHandle); return tqMetaSaveHandle(pTq, req.subKey, pHandle);
} }
...@@ -950,10 +953,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -950,10 +953,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg);
return -1; return -1;
} }
} }
taosMemoryFree(req.qmsg);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册