提交 18b7042a 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'mark/tmq' of https://github.com/taosdata/TDengine into mark/tmq

...@@ -162,6 +162,8 @@ extern char tsSmlTagName[]; ...@@ -162,6 +162,8 @@ extern char tsSmlTagName[];
// extern bool tsSmlDataFormat; // extern bool tsSmlDataFormat;
// extern int32_t tsSmlBatchSize; // extern int32_t tsSmlBatchSize;
extern int32_t tmqMaxTopicNum;
// wal // wal
extern int64_t tsWalFsyncDataSizeLimit; extern int64_t tsWalFsyncDataSizeLimit;
......
...@@ -768,6 +768,8 @@ int32_t* taosGetErrno(); ...@@ -768,6 +768,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
// stream // stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
...@@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table ...@@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// bool tsSmlDataFormat = false; // bool tsSmlDataFormat = false;
// int32_t tsSmlBatchSize = 10000; // int32_t tsSmlBatchSize = 10000;
// tmq
int32_t tmqMaxTopicNum = 20;
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0; int32_t tsQueryRspPolicy = 0;
...@@ -777,6 +779,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -777,6 +779,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; // tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; // tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32; tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32;
...@@ -1196,6 +1199,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { ...@@ -1196,6 +1199,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype); cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
} else if (strcasecmp("smlChildTableName", name) == 0) { } else if (strcasecmp("smlChildTableName", name) == 0) {
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
} else if (strcasecmp("tmqMaxTopicNum", name) == 0) {
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
} else if (strcasecmp("smlTagName", name) == 0) { } else if (strcasecmp("smlTagName", name) == 0) {
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
// } else if (strcasecmp("smlDataFormat", name) == 0) { // } else if (strcasecmp("smlDataFormat", name) == 0) {
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
#define MND_MAX_GROUP_PER_TOPIC 100
#define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_HB_CNT 6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 #define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
...@@ -220,10 +221,10 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { ...@@ -220,10 +221,10 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { // if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
mndReleaseConsumer(pMnode, pConsumer); // mndReleaseConsumer(pMnode, pConsumer);
return -1; // return -1;
} // }
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; // pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
...@@ -316,22 +317,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -316,22 +317,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
hbStatus); hbStatus);
if (status == MQ_CONSUMER_STATUS_READY) { if (status == MQ_CONSUMER_STATUS_READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
// SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
// if (pLostMsg == NULL) { } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
// mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
// pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
// continue;
// }
//
// pLostMsg->consumerId = pConsumer->consumerId;
// SRpcMsg rpcMsg = {
// .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
//
// mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
// MND_CONSUMER_LOST_HB_CNT);
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) { for (int32_t i = 0; i < topicNum; i++) {
...@@ -344,8 +332,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -344,8 +332,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} }
} else if (status == MQ_CONSUMER_STATUS_LOST) { } else if (status == MQ_CONSUMER_STATUS_LOST) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
} }
} else { // MQ_CONSUMER_STATUS_REBALANCE } else { // MQ_CONSUMER_STATUS_REBALANCE
...@@ -455,7 +442,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -455,7 +442,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
pConsumer->cgroup); pConsumer->cgroup);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1; goto FAIL;
} }
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
...@@ -480,7 +467,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -480,7 +467,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
if (status != MQ_CONSUMER_STATUS_READY) { if (status != MQ_CONSUMER_STATUS_READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1; goto FAIL;
} }
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
...@@ -562,7 +549,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -562,7 +549,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; goto FAIL;
} }
SMqRspHead* pHead = buf; SMqRspHead* pHead = buf;
...@@ -649,6 +636,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -649,6 +636,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
char *cgroup = subscribe.cgroup; char *cgroup = subscribe.cgroup;
SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pExistedConsumer = NULL;
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
int32_t code = -1; int32_t code = -1;
SArray *pTopicList = subscribe.topicNames; SArray *pTopicList = subscribe.topicNames;
...@@ -656,9 +644,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -656,9 +644,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
int32_t newTopicNum = taosArrayGetSize(pTopicList); int32_t newTopicNum = taosArrayGetSize(pTopicList);
for(int i = 0; i < newTopicNum; i++){
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i));
if(pSub != NULL && taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
code = terrno;
goto _over;
}
}
// check topic existence // check topic existence
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) { if (pTrans == NULL) {
goto _over; goto _over;
} }
......
...@@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc ...@@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) { if (pVgObj == NULL) {
taosMemoryFree(buf); taosMemoryFree(buf);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { ...@@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SCMCreateTopicReq createTopicReq = {0}; SCMCreateTopicReq createTopicReq = {0};
if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){
terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
mError("topic num out of range");
return code;
}
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) { if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -699,6 +704,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -699,6 +704,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndReleaseConsumer(pMnode, pConsumer);
continue; continue;
} }
......
...@@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval ...@@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
// stream // stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册