From f65cd36e80b84dfe7c1ff212c35a57fe82fc59ec Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 16 Apr 2023 00:54:11 +0800 Subject: [PATCH] opti:the logic of mndDoRebalance for clear --- source/dnode/mnode/impl/src/mndSubscribe.c | 37 ++++++++++------------ 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 61e446c904..9f245611a3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -269,7 +269,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId); + mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId); } } @@ -399,33 +399,30 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } } - if(imbConsumerNum != 0) { - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) { + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) { + break; + } + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { + pRemovedIter = taosHashIterate(pHash, pRemovedIter); + if (pRemovedIter == NULL) { + mError("sub:%s removed iter is null", pSubKey); break; } - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - - if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - mError("sub:%s removed iter is null", pSubKey); - break; - } - pRebVg = (SMqRebOutputVg *)pRemovedIter; - pRebVg->newConsumerId = pConsumerEp->consumerId; - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); - } + pRebVg = (SMqRebOutputVg *)pRemovedIter; + pRebVg->newConsumerId = pConsumerEp->consumerId; + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } - } // All assigned vg should be put into pOutput->rebVgs if(pRemovedIter != NULL){ - mError("sub:%s pRemovedIter should be NULL", pSubKey); + mError("sub:%s error pRemovedIter should be NULL", pSubKey); } while (1) { pRemovedIter = taosHashIterate(pHash, pRemovedIter); -- GitLab