diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index fad316ea12edce96bde4c21694b5402d97bf4ae0..ba4328b8fe821e0b8f858fb69d2deb687c36ac93 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitSubscribe(SMnode *pMnode); void mndCleanupSubscribe(SMnode *pMnode); +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName); SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName); SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4395446e813f1c70e8e16d4f281646d09ce17e3c..35ddf46b98d3411ec2967e1b772a9b6f432baf91 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -645,18 +645,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(pTopicList); for(int i = 0; i < newTopicNum; i++){ - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); - if(pSub == NULL) continue; - taosRLockLatch(&pSub->lock); - if(taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i)); + if(gNum > MND_MAX_GROUP_PER_TOPIC){ terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; code = terrno; - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); goto _over; } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); } // check topic existence diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ce3a4ea048c18d185a0397208a7d650fdca02fff..4b99ad624984adaf8ae0d997ad39b65378cc28e3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -952,6 +952,32 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { return pSub; } +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { + int32_t num = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqSubscribeObj *pSub = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + if (pIter == NULL) break; + + + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup, true); + if (strcmp(topic, topicName) != 0) { + sdbRelease(pSdb, pSub); + continue; + } + + num++; + sdbRelease(pSdb, pSub); + } + + return num; +} + void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pSub);