提交 4b95ea9c 编写于 作者: wmmhello's avatar wmmhello

fix:clear consumer if normal close

上级 0ae6cd7a
...@@ -220,10 +220,10 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { ...@@ -220,10 +220,10 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { // if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
mndReleaseConsumer(pMnode, pConsumer); // mndReleaseConsumer(pMnode, pConsumer);
return -1; // return -1;
} // }
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; // pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
...@@ -316,22 +316,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -316,22 +316,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
hbStatus); hbStatus);
if (status == MQ_CONSUMER_STATUS_READY) { if (status == MQ_CONSUMER_STATUS_READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
// SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
// if (pLostMsg == NULL) { } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
// mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
// pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
// continue;
// }
//
// pLostMsg->consumerId = pConsumer->consumerId;
// SRpcMsg rpcMsg = {
// .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
//
// mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
// MND_CONSUMER_LOST_HB_CNT);
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) { for (int32_t i = 0; i < topicNum; i++) {
...@@ -344,8 +331,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -344,8 +331,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} }
} else if (status == MQ_CONSUMER_STATUS_LOST) { } else if (status == MQ_CONSUMER_STATUS_LOST) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
} }
} else { // MQ_CONSUMER_STATUS_REBALANCE } else { // MQ_CONSUMER_STATUS_REBALANCE
......
...@@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc ...@@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) { if (pVgObj == NULL) {
taosMemoryFree(buf); taosMemoryFree(buf);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册