diff --git a/include/common/tglobal.h b/include/common/tglobal.h index b08916f891fba90f246206956b19741d916914cb..5dff313a9de8164cf26484a3e8ad11eec43e0167 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -162,6 +162,8 @@ extern char tsSmlTagName[]; // extern bool tsSmlDataFormat; // extern int32_t tsSmlBatchSize; +extern int32_t tmqMaxTopicNum; + // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b1c8ada6a482f3217b2ba6c06a16b10130580f4c..6147ad682064a0cb0efded329e0fa64289d756f6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -768,6 +768,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) +#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) +#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index dd64538f3d89a9a779ca511ccb35b8f6cd94b693..0f20d6fd2c9288f958a3c975ddc2a18a8b07142d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; +// tmq +int32_t tmqMaxTopicNum = 20; // query int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; @@ -777,6 +779,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); // tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; // tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32; @@ -1196,6 +1199,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype); } else if (strcasecmp("smlChildTableName", name) == 0) { tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); + } else if (strcasecmp("tmqMaxTopicNum", name) == 0) { + tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; } else if (strcasecmp("smlTagName", name) == 0) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); // } else if (strcasecmp("smlDataFormat", name) == 0) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index a7c2e1e2c63224b622dd5e31fa96d2f30eb8b35f..1aed0ba3664340c0d1cd83ac03ddcd38917e27eb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -26,6 +26,7 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 +#define MND_MAX_GROUP_PER_TOPIC 100 #define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 @@ -635,6 +636,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; int32_t code = -1; SArray *pTopicList = subscribe.topicNames; @@ -642,9 +644,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); int32_t newTopicNum = taosArrayGetSize(pTopicList); + for(int i = 0; i < newTopicNum; i++){ + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); + if(pSub != NULL && taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; + code = terrno; + goto _over; + } + } // check topic existence - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index c9afbaf524bd70bea4635518e05004a8d4042e1f..551b08d2be0d0bcc7aaff6fbeaacf30a61b45d16 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { SMqTopicObj *pTopic = NULL; SDbObj *pDb = NULL; SCMCreateTopicReq createTopicReq = {0}; + if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){ + terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE; + mError("topic num out of range"); + return code; + } if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e4a00c1fc95e6e7c0143fbfecea07ec5a97158ae..e6ffec85ec0def87d1630251e4db5aefc0763634 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") -TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")