diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a4af40285e7084db597384dfe4295df6032ce8ea..6603c4019f5c92111571633aa0908f4582485e74 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -565,6 +565,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { + goto END; tscError("failed to malloc get subscribe ep buf"); } buf->consumerId = htobe64(tmq->consumerId); @@ -572,6 +573,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); if (pRequest == NULL) { + goto END; tscError("failed to malloc subscribe ep request"); } @@ -579,7 +581,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { - free(buf); goto END; } pParam->tmq = tmq; @@ -596,6 +597,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: + tfree(buf); if (wait) tsem_wait(&tmq->rspSem); return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9cce25ed6aaa5e03dc4552e04e9edf207fa3db6d..1b7383bee19f4a2887cd0af723bf488185d61d10 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu return buf; } +static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) { + if (pConsumerEp) { + tfree(pConsumerEp->qmsg); + } +} + // unit for rebalance typedef struct SMqSubscribeObj { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -571,6 +577,14 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return buf; } +static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { + if (pSub->availConsumer) taosArrayDestroy(pSub->availConsumer); + if (pSub->assigned) taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->unassignedVg) taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->idleConsumer) taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->lostConsumer) taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); +} + typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 62083a3d0333d25b16b942fad0561009bd1fb620..0da5b5f27ef43116969ddea115581b9e9164aa4e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -449,6 +449,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { mTrace("subscribe:%s, perform delete action", pSub->key); + tDeleteSMqSubscribeObj(pSub); return 0; }