diff --git a/source/common/src/systable.c b/source/common/src/systable.c index f6028439d72192e406520b57df7c4d0ff5471c97..c2024a9a779661eb1876525be29497664f9c6eaa 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -160,9 +160,9 @@ static const SSysDbTableSchema streamSchema[] = { static const SSysDbTableSchema streamTaskSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "task_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index fad316ea12edce96bde4c21694b5402d97bf4ae0..ba4328b8fe821e0b8f858fb69d2deb687c36ac93 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitSubscribe(SMnode *pMnode); void mndCleanupSubscribe(SMnode *pMnode); +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName); SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName); SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9b4b9ac8c538294c065ba57a500ea3401c95b4ba..7d2b8acf4ed7993797b40b8f277f2cfe6e0c097a 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -666,18 +666,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(pTopicList); for(int i = 0; i < newTopicNum; i++){ - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); - if(pSub == NULL) continue; - taosRLockLatch(&pSub->lock); - if(taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i)); + if(gNum >= MND_MAX_GROUP_PER_TOPIC){ terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; code = terrno; - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); goto _over; } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); } // check topic existence diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 6bbb8cf32c3fee60b5483f1863b2f135130868ab..893c38027690d25e71e92f9a3c225c2c1cc76f31 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -992,6 +992,32 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { return pSub; } +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { + int32_t num = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqSubscribeObj *pSub = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + if (pIter == NULL) break; + + + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup, true); + if (strcmp(topic, topicName) != 0) { + sdbRelease(pSdb, pSub); + continue; + } + + num++; + sdbRelease(pSdb, pSub); + } + + return num; +} + void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pSub);