From 779e35b679582af40e13e65aa6e2dcb8918f4425 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 16 Jun 2023 13:52:08 +0800 Subject: [PATCH] fix:drop consumer wher drop topic or group --- source/dnode/mnode/impl/inc/mndConsumer.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 31 ++++++++++++---------- source/dnode/mnode/impl/src/mndSubscribe.c | 5 +--- source/dnode/mnode/impl/src/mndTopic.c | 5 ++++ 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 5f35be869e..a3a31cfc5a 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -33,6 +33,7 @@ enum { int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 0ae38e00f0..3eb33bf4a9 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -75,6 +75,22 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ + SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); + if (pClearMsg == NULL) { + mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); + return; + } + + pClearMsg->consumerId = consumerId; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; + + mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + return; +} + bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); mDebug("tq timer, rebalance counter old val:%d", old); @@ -330,20 +346,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } else if (status == MQ_CONSUMER_STATUS_LOST) { // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close - SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); - if (pClearMsg == NULL) { - mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", - pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); - continue; - } - - pClearMsg->consumerId = pConsumer->consumerId; - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; - - mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId, - MND_CONSUMER_LOST_CLEAR_THRESHOLD); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); } } else { // MQ_CONSUMER_STATUS_REBALANCE taosRLockLatch(&pConsumer->lock); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 6b6e9fab77..4c6a60d8f7 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -760,10 +760,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { } if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) { - sdbRelease(pMnode->pSdb, pConsumer); - terrno = TSDB_CODE_MND_CGROUP_USED; - mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - return -1; + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); } sdbRelease(pMnode->pSdb, pConsumer); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 2a632f6b2d..01241c9339 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -697,6 +697,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { break; } + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + continue; + } + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); -- GitLab