diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index ce6bbf999dc4989ce1463dcb202f5ad982ead884..a7c2e1e2c63224b622dd5e31fa96d2f30eb8b35f 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -220,10 +220,10 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { +// mndReleaseConsumer(pMnode, pConsumer); +// return -1; +// } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); // pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; @@ -316,22 +316,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { hbStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { -// SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); -// if (pLostMsg == NULL) { -// mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d", -// pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg)); -// continue; -// } -// -// pLostMsg->consumerId = pConsumer->consumerId; -// SRpcMsg rpcMsg = { -// .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)}; -// -// mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId, -// MND_CONSUMER_LOST_HB_CNT); -// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); for (int32_t i = 0; i < topicNum; i++) { @@ -344,8 +331,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosRUnLockLatch(&pConsumer->lock); } } else if (status == MQ_CONSUMER_STATUS_LOST) { - // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close + if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); } } else { // MQ_CONSUMER_STATUS_REBALANCE diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4c6a60d8f75ccd7a690c734a91d631c3c6e4c997..ce3a4ea048c18d185a0397208a7d650fdca02fff 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { taosMemoryFree(buf); - terrno = TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; return -1; }