diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8d55a7bea1d33e0730f6ff2d7da86290fdf3fef7..a8ed470f50fc97fd7085edb2e6404c94d38974e6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -830,7 +830,6 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); - // todo lock tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); @@ -863,8 +862,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); if (pRef == NULL) { + taosMemoryFree(req.qmsg); return -1; } + int64_t ver = pRef->refVer; pHandle->pRef = pRef; @@ -921,6 +922,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, pHandle->consumerId, oldConsumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { + taosMemoryFree(req.qmsg); return -1; } } else { @@ -928,6 +930,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); atomic_store_32(&pHandle->epoch, -1); atomic_add_fetch_32(&pHandle->epoch, 1); + taosMemoryFree(req.qmsg); return tqMetaSaveHandle(pTq, req.subKey, pHandle); } @@ -950,10 +953,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosWUnLockLatch(&pTq->pushLock); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { + taosMemoryFree(req.qmsg); return -1; } } + taosMemoryFree(req.qmsg); return 0; }