diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f1b870cbf37684066bc5b3592740f4cc7a9008c8..cfc164c26c36a3c871963ddd7c68a216139c02fb 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -484,16 +484,28 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; if (init) { taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows); -// mDebug("pSub->offsetRows is init"); + mInfo("pSub->offsetRows is init"); } else { + SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); + for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); + bool jump = false; + for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, j); + if(pVgEp->vgId == d1->vgId){ + jump = true; + mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); + break; + } + } + if(jump) continue; for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); if (d1->vgId == d2->vgId) { d2->rows += d1->rows; d2->offset = d1->offset; -// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); + mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); } } }