diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index af66f8d149dce7c769b18189f8d9c87199412b98..62beee03037a5b384a0e80a94c419610c647047c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -570,8 +570,8 @@ typedef struct { SEpSet epSet; } SMqVgEp; -//SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); -//void tDeleteSMqVgEp(SMqVgEp* pVgEp); +SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); +void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 15f0cc9b71cbe52b33104af214758f0554b901c5..f0fa40cacf2b6eb7f32a334e914f1c284f2db589 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -183,21 +183,21 @@ void tFreeStreamObj(SStreamObj *pStream) { } } -//SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { -// SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); -// if (pVgEpNew == NULL) return NULL; -// pVgEpNew->vgId = pVgEp->vgId; -//// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); -// pVgEpNew->epSet = pVgEp->epSet; -// return pVgEpNew; -//} +SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { + SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); + if (pVgEpNew == NULL) return NULL; + pVgEpNew->vgId = pVgEp->vgId; +// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); + pVgEpNew->epSet = pVgEp->epSet; + return pVgEpNew; +} -//void tDeleteSMqVgEp(SMqVgEp *pVgEp) { -// if (pVgEp) { -//// taosMemoryFreeClear(pVgEp->qmsg); -// taosMemoryFree(pVgEp); -// } -//} +void tDeleteSMqVgEp(SMqVgEp *pVgEp) { + if (pVgEp) { +// taosMemoryFreeClear(pVgEp->qmsg); + taosMemoryFree(pVgEp); + } +} int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; @@ -517,11 +517,11 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp newEp = { .consumerId = pConsumerEp->consumerId, - .vgs = taosArrayDup(pConsumerEp->vgs, NULL), + .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), }; taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } - pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL); + pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); pSubNew->qmsg = taosStrdup(pSub->qmsg); @@ -534,11 +534,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - taosArrayDestroy(pConsumerEp->vgs); + taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroy(pConsumerEp->offsetRows); } taosHashCleanup(pSub->consumerHash); - taosArrayDestroy(pSub->unassignedVgs); + taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); taosMemoryFreeClear(pSub->qmsg); taosArrayDestroy(pSub->offsetRows); }