提交 bb1e24dd 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/main' into fix/m23.0

...@@ -160,9 +160,9 @@ static const SSysDbTableSchema streamSchema[] = { ...@@ -160,9 +160,9 @@ static const SSysDbTableSchema streamSchema[] = {
static const SSysDbTableSchema streamTaskSchema[] = { static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "task_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
int32_t mndInitSubscribe(SMnode *pMnode); int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode); void mndCleanupSubscribe(SMnode *pMnode);
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName); SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
......
...@@ -666,18 +666,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -666,18 +666,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t newTopicNum = taosArrayGetSize(pTopicList); int32_t newTopicNum = taosArrayGetSize(pTopicList);
for(int i = 0; i < newTopicNum; i++){ for(int i = 0; i < newTopicNum; i++){
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i));
if(pSub == NULL) continue; if(gNum >= MND_MAX_GROUP_PER_TOPIC){
taosRLockLatch(&pSub->lock);
if(taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
code = terrno; code = terrno;
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
goto _over; goto _over;
} }
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
} }
// check topic existence // check topic existence
......
...@@ -992,6 +992,32 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { ...@@ -992,6 +992,32 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
return pSub; return pSub;
} }
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
int32_t num = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqSubscribeObj *pSub = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
if (pIter == NULL) break;
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
if (strcmp(topic, topicName) != 0) {
sdbRelease(pSdb, pSub);
continue;
}
num++;
sdbRelease(pSdb, pSub);
}
return num;
}
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册