diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8d518e94062d92b9658441cf0725ceed403a6e97..eb81e8cbad49a67c7d7f807bf10c01aaab15d935 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -695,11 +695,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); + printf("over1\n"); usleep(blocking_time * 1000); return NULL; } SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(pTopic->vgs) == 0) { + printf("over2\n"); usleep(blocking_time * 1000); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c16a621b0dd7810d697243e2c3473d9d110a7b13..840326d3183bd889ef71368bd6c32dee9dc20e74 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -424,7 +424,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { - pRebConsumer->epoch++; + if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) { + pRebConsumer->epoch++; + } if (vgThisConsumerAfterRb != 0) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); } else { @@ -1036,7 +1038,6 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { taosArrayPush(pSub->consumers, &mqSubConsumer); // if have un assigned vg, assign one to the consumer -#if 0 if (taosArrayGetSize(pSub->unassignedVg) > 0) { SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); pConsumerEp->oldConsumerId = pConsumerEp->consumerId; @@ -1047,10 +1048,9 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } else { mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); } - // do not set status active to trigger rebalance + // to trigger rebalance at once, do not set status active /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ } -#endif SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY);