From 9cb9cb686a08129659a0ec08315a65a4721ee85c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 16 Feb 2022 18:11:36 +0800 Subject: [PATCH] add eager rebalance back --- source/client/src/tmq.c | 2 ++ source/dnode/mnode/impl/src/mndSubscribe.c | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8d518e9406..eb81e8cbad 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -695,11 +695,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); + printf("over1\n"); usleep(blocking_time * 1000); return NULL; } SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(pTopic->vgs) == 0) { + printf("over2\n"); usleep(blocking_time * 1000); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c16a621b0d..840326d318 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -424,7 +424,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { - pRebConsumer->epoch++; + if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) { + pRebConsumer->epoch++; + } if (vgThisConsumerAfterRb != 0) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); } else { @@ -1036,7 +1038,6 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { taosArrayPush(pSub->consumers, &mqSubConsumer); // if have un assigned vg, assign one to the consumer -#if 0 if (taosArrayGetSize(pSub->unassignedVg) > 0) { SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); pConsumerEp->oldConsumerId = pConsumerEp->consumerId; @@ -1047,10 +1048,9 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } else { mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); } - // do not set status active to trigger rebalance + // to trigger rebalance at once, do not set status active /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ } -#endif SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); -- GitLab