From 64d8d63f3dece0839a2093f962b4c3e1b29128df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Jun 2023 10:47:55 +0800 Subject: [PATCH] fix:error in limit for tmq group --- source/dnode/mnode/impl/inc/mndSubscribe.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 10 ++------- source/dnode/mnode/impl/src/mndSubscribe.c | 26 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index fad316ea12..ba4328b8fe 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitSubscribe(SMnode *pMnode); void mndCleanupSubscribe(SMnode *pMnode); +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName); SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName); SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4395446e81..35ddf46b98 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -645,18 +645,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(pTopicList); for(int i = 0; i < newTopicNum; i++){ - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); - if(pSub == NULL) continue; - taosRLockLatch(&pSub->lock); - if(taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i)); + if(gNum > MND_MAX_GROUP_PER_TOPIC){ terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; code = terrno; - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); goto _over; } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); } // check topic existence diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ce3a4ea048..4b99ad6249 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -952,6 +952,32 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { return pSub; } +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { + int32_t num = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqSubscribeObj *pSub = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + if (pIter == NULL) break; + + + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup, true); + if (strcmp(topic, topicName) != 0) { + sdbRelease(pSdb, pSub); + continue; + } + + num++; + sdbRelease(pSdb, pSub); + } + + return num; +} + void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pSub); -- GitLab