diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 6603c4019f5c92111571633aa0908f4582485e74..f9ade7287fc4a09ec62d6fc9ed4d46433dd5f5f2 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -597,7 +597,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: - tfree(buf); if (wait) tsem_wait(&tmq->rspSem); return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1b7383bee19f4a2887cd0af723bf488185d61d10..a9a633cf63e20931b6a09e8daebc9c3d37db2d6d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -526,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->assigned, &cEp); } @@ -539,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->lostConsumer, &cEp); } @@ -553,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->idleConsumer, &cEp); } @@ -569,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->unassignedVg, &cEp); } @@ -578,11 +578,26 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) } static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { - if (pSub->availConsumer) taosArrayDestroy(pSub->availConsumer); - if (pSub->assigned) taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); - if (pSub->unassignedVg) taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); - if (pSub->idleConsumer) taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); - if (pSub->lostConsumer) taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->availConsumer) { + taosArrayDestroy(pSub->availConsumer); + pSub->availConsumer = NULL; + } + if (pSub->assigned) { + taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->assigned = NULL; + } + if (pSub->unassignedVg) { + taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->unassignedVg = NULL; + } + if (pSub->idleConsumer) { + taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->idleConsumer = NULL; + } + if (pSub->lostConsumer) { + taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->lostConsumer = NULL; + } } typedef struct SMqCGroup { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0da5b5f27ef43116969ddea115581b9e9164aa4e..47d7e14e45986f1e2ca30893da9a331e2ab4cb81 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -292,7 +292,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas return -1; } - SMqConsumerEp CEp; + SMqConsumerEp CEp = {0}; CEp.status = 0; CEp.consumerId = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;