diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 154973e20fee0c1a39453935fb80011ecd959d6f..a5b9d356f86ee71d7dcdb78d7c67a74f220eef02 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1744,6 +1744,10 @@ typedef struct SMqCMGetSubEpRsp { SArray* topics; // SArray } SMqCMGetSubEpRsp; +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { + taosArrayDestroy(pSubTopicEp->vgs); +} + static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); @@ -1757,6 +1761,10 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { return buf; } +static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { + taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); +} + static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f9ade7287fc4a09ec62d6fc9ed4d46433dd5f5f2..ccdd1e64cb76dfeea1da51d1584ae024ec7f13e3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -557,6 +557,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { if (pParam->wait) { tsem_post(&tmq->rspSem); } + tDeleteSMqCMGetSubEpRsp(&rsp); free(pParam); return 0; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e642b578fa6e0a1359a7a2ab1355b5602c192123..6fd61baee387814b2550c0523a46c6b8caaabc51 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -75,6 +75,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_SUCCESS; CM_ENCODE_OVER: + tfree(buf); if (terrno != 0) { mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -117,6 +118,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; CM_DECODE_OVER: + tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); tfree(pRow); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 47d7e14e45986f1e2ca30893da9a331e2ab4cb81..8e62d48e610d704a28768b4a36885bea78615f9e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -388,6 +388,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { terrno = TSDB_CODE_SUCCESS; SUB_ENCODE_OVER: + tfree(buf); if (terrno != 0) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -431,10 +432,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; SUB_DECODE_OVER: + tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); // TODO free subscribeobj - tfree(buf); tfree(pRow); return NULL; }