From ce0b71c6349fabcc88bf6fdc6d0a93bd1c484762 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Jun 2023 15:57:30 +0800 Subject: [PATCH] fix:add rows to pSub if rebalance --- source/dnode/mnode/impl/src/mndSubscribe.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f1b870cbf3..cfc164c26c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -484,16 +484,28 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; if (init) { taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows); -// mDebug("pSub->offsetRows is init"); + mInfo("pSub->offsetRows is init"); } else { + SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); + for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); + bool jump = false; + for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, j); + if(pVgEp->vgId == d1->vgId){ + jump = true; + mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); + break; + } + } + if(jump) continue; for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); if (d1->vgId == d2->vgId) { d2->rows += d1->rows; d2->offset = d1->offset; -// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); + mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); } } } -- GitLab