diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 49921c9a1a0572c8498ca1ff01fb5491b23cd595..61e446c90476e7e9a7fa3b7beed207e8f1d7e260 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -339,13 +339,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved); - // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg + // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - // 2. check and get actual removed consumers, put their vg into hash + // 2. check and get actual removed consumers, put their vg into pHash doRemoveExistedConsumers(pOutput, pHash, pInput); - // 3. if previously no consumer, there are vgs not assigned + // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash addUnassignedVgroups(pOutput, pHash); // 4. calc vg number of each consumer @@ -364,19 +364,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("sub:%s no consumer subscribe this topic", pSubKey); } - // 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is - // minVgCnt, and then put them into the recycled hash list + // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum); // 6. add new consumer into sub doAddNewConsumers(pOutput, pInput); - // 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them - // All related vg should be put into rebVgs SMqRebOutputVg *pRebVg = NULL; void *pRemovedIter = NULL; void *pIter = NULL; + // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) { @@ -390,68 +388,55 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { - mError("sub:%s removed iter is null", pSubKey); + mError("sub:%s removed iter is null, never can reach hear", pSubKey); break; } pRebVg = (SMqRebOutputVg *)pRemovedIter; - // push - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; - taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, - pConsumerEp->consumerId); + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } - // 7. handle unassigned vg - if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { - // if has consumer, assign all left vg + if(imbConsumerNum != 0) { while (1) { - SMqConsumerEp *pConsumerEp = NULL; - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - if (pIter != NULL) { - taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); - pIter = NULL; - } + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) { break; } - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { + pRemovedIter = taosHashIterate(pHash, pRemovedIter); + if (pRemovedIter == NULL) { + mError("sub:%s removed iter is null", pSubKey); break; } + + pRebVg = (SMqRebOutputVg *)pRemovedIter; + pRebVg->newConsumerId = pConsumerEp->consumerId; + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } - pRebVg = (SMqRebOutputVg *)pRemovedIter; - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); - pRebVg->newConsumerId = pConsumerEp->consumerId; - if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { - mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId, - pConsumerEp->consumerId); - continue; - } - taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, - pConsumerEp->consumerId); } - } else { - // if all consumer is removed, put all vg into unassigned - pIter = NULL; - SMqRebOutputVg *pRebOutput = NULL; - while (1) { - pIter = taosHashIterate(pHash, pIter); - if (pIter == NULL) { - break; - } - pRebOutput = (SMqRebOutputVg *)pIter; + } + + // All assigned vg should be put into pOutput->rebVgs + if(pRemovedIter != NULL){ + mError("sub:%s pRemovedIter should be NULL", pSubKey); + } + while (1) { + pRemovedIter = taosHashIterate(pHash, pRemovedIter); + if (pRemovedIter == NULL) { + break; + } + SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter; + taosArrayPush(pOutput->rebVgs, pRebOutput); + if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); - taosArrayPush(pOutput->rebVgs, pRebOutput); - mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId); } } @@ -462,19 +447,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey, pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } - { - pIter = NULL; - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); - } + + pIter = NULL; + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); + mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, + pConsumerEp->consumerId); } } @@ -653,13 +637,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { mndReleaseTopic(pMnode, pTopic); rebInput.oldConsumerNum = 0; - mInfo("topic:%s has no consumers sub yet", topic); + mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key); } else { taosRLockLatch(&pSub->lock); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput.pSub = tCloneSubscribeObj(pSub); taosRUnLockLatch(&pSub->lock); - mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); + mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); }