diff --git a/include/common/tglobal.h b/include/common/tglobal.h index b08916f891fba90f246206956b19741d916914cb..5dff313a9de8164cf26484a3e8ad11eec43e0167 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -162,6 +162,8 @@ extern char tsSmlTagName[]; // extern bool tsSmlDataFormat; // extern int32_t tsSmlBatchSize; +extern int32_t tmqMaxTopicNum; + // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index db076838f2a58e1680fc652b3f40b133bed7fc53..cc8b9472846e7225eac3e09400bdbf9f9b114c0c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -145,7 +145,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) - TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) +// TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b1c8ada6a482f3217b2ba6c06a16b10130580f4c..6147ad682064a0cb0efded329e0fa64289d756f6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -768,6 +768,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) +#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) +#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bb0e882fcdd1fdd77f0af49cbd3a90423bb4edb3..da47293773324b409da38fef6fb5625bacbe2534 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -290,7 +290,7 @@ static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema vnodesSchema[] = { @@ -350,7 +350,7 @@ static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema consumerSchema[] = { - {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index dd64538f3d89a9a779ca511ccb35b8f6cd94b693..810425fb70048f12e498e4f31c1048a400181faf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; +// tmq +int32_t tmqMaxTopicNum = 20; // query int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; @@ -507,6 +509,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1; @@ -875,6 +879,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; + tmqMaxTopicNum= cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; + tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 96401511d2cd4832ad6d548a4b7286ba62227a7d..a3a31cfc5a5cfd0fdde3830ab015d2ca8cd72c98 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,14 +25,15 @@ extern "C" { enum { MQ_CONSUMER_STATUS_REBALANCE = 1, // MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore - MQ_CONSUMER_STATUS__READY, - MQ_CONSUMER_STATUS__LOST, + MQ_CONSUMER_STATUS_READY, + MQ_CONSUMER_STATUS_LOST, // MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore - MQ_CONSUMER_STATUS__LOST_REBD, -}; +// MQ_CONSUMER_STATUS__LOST_REBD, +};\ int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e913ecb6ab7d00f2fc0d12c636ab025cb809a27b..a36627718de8ce8798025f23bd617749c8c5b21f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -137,12 +137,12 @@ typedef enum { } EDndReason; typedef enum { - CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic - CONSUMER_UPDATE__ADD, - CONSUMER_UPDATE__REMOVE, - CONSUMER_UPDATE__LOST, - CONSUMER_UPDATE__RECOVER, - CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic + CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance + CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance + CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance +// CONSUMER_UPDATE_TIMER_LOST, + CONSUMER_UPDATE_RECOVER, + CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req } ECsmUpdateType; typedef struct { @@ -548,13 +548,13 @@ typedef struct { // data for display int32_t pid; SEpSet ep; - int64_t upTime; + int64_t createTime; int64_t subscribeTime; int64_t rebalanceTime; } SMqConsumerObj; SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f5301285726944119f6c7da1bf4587ece1919db8..4395446e813f1c70e8e16d4f281646d09ce17e3c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -26,6 +26,7 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 +#define MND_MAX_GROUP_PER_TOPIC 100 #define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 @@ -63,7 +64,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); - mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); +// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); @@ -75,6 +76,22 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ + SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); + if (pClearMsg == NULL) { + mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); + return; + } + + pClearMsg->consumerId = consumerId; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; + + mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + return; +} + bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); mDebug("tq timer, rebalance counter old val:%d", old); @@ -105,50 +122,48 @@ void mndRebCntDec() { } } -static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqConsumerLostMsg *pLostMsg = pMsg->pCont; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId); - if (pConsumer == NULL) { - return 0; - } - - mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status, - mndConsumerStatusName(pConsumer->status)); - - if (pConsumer->status != MQ_CONSUMER_STATUS__READY) { - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } - - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__LOST; - - mndReleaseConsumer(pMnode, pConsumer); - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); - if (pTrans == NULL) { - goto FAIL; - } - - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - goto FAIL; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - goto FAIL; - } - - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); - mndTransDrop(pTrans); - return 0; -FAIL: - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); - mndTransDrop(pTrans); - return -1; -} +//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { +// SMnode *pMnode = pMsg->info.node; +// SMqConsumerLostMsg *pLostMsg = pMsg->pCont; +// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId); +// if (pConsumer == NULL) { +// return 0; +// } +// +// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status, +// mndConsumerStatusName(pConsumer->status)); +// +// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) { +// mndReleaseConsumer(pMnode, pConsumer); +// return -1; +// } +// +// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); +// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; +// +// mndReleaseConsumer(pMnode, pConsumer); +// +// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); +// if (pTrans == NULL) { +// goto FAIL; +// } +// +// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { +// goto FAIL; +// } +// +// if (mndTransPrepare(pMnode, pTrans) != 0) { +// goto FAIL; +// } +// +// tDeleteSMqConsumerObj(pConsumerNew, true); +// mndTransDrop(pTrans); +// return 0; +//FAIL: +// tDeleteSMqConsumerObj(pConsumerNew, true); +// mndTransDrop(pTrans); +// return -1; +//} static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -162,14 +177,14 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); - if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { + if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { mndReleaseConsumer(pMnode, pConsumer); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; return -1; } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER; + pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER; mndReleaseConsumer(pMnode, pConsumer); @@ -181,13 +196,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return 0; FAIL: - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return -1; } @@ -206,13 +221,13 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); - if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { +// mndReleaseConsumer(pMnode, pConsumer); +// return -1; +// } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__LOST; +// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; mndReleaseConsumer(pMnode, pConsumer); @@ -223,14 +238,14 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return 0; FAIL: - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return -1; } @@ -297,56 +312,29 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d", - pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime, + mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); - if (status == MQ_CONSUMER_STATUS__READY) { - if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { - SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); - if (pLostMsg == NULL) { - mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d", - pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg)); - continue; - } - - pLostMsg->consumerId = pConsumer->consumerId; - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)}; - - mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId, - MND_CONSUMER_LOST_HB_CNT); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - } - } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { - // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { - SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); - if (pClearMsg == NULL) { - mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", - pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); - continue; + if (status == MQ_CONSUMER_STATUS_READY) { + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + taosRLockLatch(&pConsumer->lock); + int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < topicNum; i++) { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); + mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); + SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); + taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } - - pClearMsg->consumerId = pConsumer->consumerId; - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; - - mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId, - MND_CONSUMER_LOST_CLEAR_THRESHOLD); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + taosRUnLockLatch(&pConsumer->lock); } - } else if (status == MQ_CONSUMER_STATUS__LOST) { - taosRLockLatch(&pConsumer->lock); - int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < topicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); - taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); + } else if (status == MQ_CONSUMER_STATUS_LOST) { + if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); } - taosRUnLockLatch(&pConsumer->lock); } else { // MQ_CONSUMER_STATUS_REBALANCE taosRLockLatch(&pConsumer->lock); @@ -411,7 +399,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + if (status == MQ_CONSUMER_STATUS_LOST) { mInfo("try to recover consumer:0x%" PRIx64 "", consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -454,7 +442,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, pConsumer->cgroup); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - return -1; + goto FAIL; } atomic_store_32(&pConsumer->hbStatus, 0); @@ -462,7 +450,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + if (status == MQ_CONSUMER_STATUS_LOST) { mInfo("try to recover consumer:0x%" PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -476,10 +464,10 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } - if (status != MQ_CONSUMER_STATUS__READY) { + if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - return -1; + goto FAIL; } int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); @@ -561,7 +549,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { void *buf = rpcMallocCont(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + goto FAIL; } SMqRspHead* pHead = buf; @@ -648,6 +636,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; int32_t code = -1; SArray *pTopicList = subscribe.topicNames; @@ -655,9 +644,23 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); int32_t newTopicNum = taosArrayGetSize(pTopicList); + for(int i = 0; i < newTopicNum; i++){ + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); + if(pSub == NULL) continue; + taosRLockLatch(&pSub->lock); + if(taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; + code = terrno; + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + goto _over; + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } // check topic existence - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } @@ -675,8 +678,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); - // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; +// pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; // use insert logic taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -695,7 +697,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { " cgroup:%s, current status:%d(%s), subscribe topic num: %d", consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum); - if (status != MQ_CONSUMER_STATUS__READY) { + if (status != MQ_CONSUMER_STATUS_READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; goto _over; } @@ -706,11 +708,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; + pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - int32_t oldTopicNum = (pExistedConsumer->currentTopics) ? taosArrayGetSize(pExistedConsumer->currentTopics) : 0; + int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { @@ -765,10 +767,7 @@ _over: mndReleaseConsumer(pMnode, pExistedConsumer); } - if (pConsumerNew) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); - } + tDeleteSMqConsumerObj(pConsumerNew, true); // TODO: replace with destroy subscribe msg taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); @@ -868,17 +867,17 @@ CM_DECODE_OVER: } static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action", + mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch); - pConsumer->subscribeTime = pConsumer->upTime; + pConsumer->subscribeTime = taosGetTimestampMs(); return 0; } static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, + mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); - tDeleteSMqConsumerObj(pConsumer); + tDeleteSMqConsumerObj(pConsumer, false); return 0; } @@ -887,10 +886,9 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { if (status == MQ_CONSUMER_STATUS_REBALANCE) { - pConsumer->status = MQ_CONSUMER_STATUS__READY; - } else if (status == MQ_CONSUMER_STATUS__LOST) { - ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0); - pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; + pConsumer->status = MQ_CONSUMER_STATUS_READY; + } else if (status == MQ_CONSUMER_STATUS_READY) { + pConsumer->status = MQ_CONSUMER_STATUS_LOST; } } } @@ -904,7 +902,7 @@ static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic taosArrayRemove(pConsumer->rebNewTopics, i); taosMemoryFree(p); - mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, + mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics)); break; } @@ -920,7 +918,7 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo taosArrayRemove(pConsumer->rebRemovedTopics, i); taosMemoryFree(p); - mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", + mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); break; } @@ -935,7 +933,7 @@ static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pT taosArrayRemove(pConsumer->currentTopics, i); taosMemoryFree(topic); - mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", + mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); break; } @@ -958,47 +956,46 @@ static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { - mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64, - pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime); + mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64, + pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime); taosWLockLatch(&pOldConsumer->lock); - if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) { + if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) { TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics); TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics); TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics); - pOldConsumer->subscribeTime = pNewConsumer->upTime; + pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { - int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); - taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); - } - - pOldConsumer->rebalanceTime = pNewConsumer->upTime; - - int32_t prevStatus = pOldConsumer->status; - pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; - mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", - pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status), - pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { + mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId); +// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) { +// int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); +// for (int32_t i = 0; i < sz; i++) { +// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); +// taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); +// } +// +// int32_t prevStatus = pOldConsumer->status; +// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; +// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", +// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status), +// pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); taosArrayPush(pOldConsumer->rebNewTopics, &topic); } - pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) { + mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId); + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); - pOldConsumer->rebalanceTime = pNewConsumer->upTime; - - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { + pOldConsumer->rebalanceTime = taosGetTimestampMs(); + mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // check if exist in current topic @@ -1007,6 +1004,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // add to current topic bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); if (existing) { + mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); taosMemoryFree(pNewTopic); } else { // added into current topic list taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); @@ -1018,17 +1016,17 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, updateConsumerStatus(pOldConsumer); // the re-balance is triggered when the new consumer is launched. - pOldConsumer->rebalanceTime = pNewConsumer->upTime; + pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics), (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) { char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); // remove from removed topic @@ -1041,10 +1039,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, int32_t status = pOldConsumer->status; updateConsumerStatus(pOldConsumer); - pOldConsumer->rebalanceTime = pNewConsumer->upTime; + pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, @@ -1107,8 +1105,12 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * int32_t cols = 0; // consumer id + char consumerIdHex[32] = {0}; + sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId); + varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); + colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false); // consumer group char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -1149,7 +1151,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * // up time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false); // subscribe time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1179,10 +1181,9 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { static const char *mndConsumerStatusName(int status) { switch (status) { - case MQ_CONSUMER_STATUS__READY: + case MQ_CONSUMER_STATUS_READY: return "ready"; - case MQ_CONSUMER_STATUS__LOST: - case MQ_CONSUMER_STATUS__LOST_REBD: + case MQ_CONSUMER_STATUS_LOST: return "lost"; case MQ_CONSUMER_STATUS_REBALANCE: return "rebalancing"; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 986241b20d6cfcc299d26675a25b7aec9360d638..6efca3d4253a9a6183b66ca6ed352a6c9543a890 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -218,7 +218,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { return (void *)buf; } -SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) { +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) { SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); if (pConsumer == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -249,16 +249,20 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L return NULL; } - pConsumer->upTime = taosGetTimestampMs(); + pConsumer->createTime = taosGetTimestampMs(); return pConsumer; } -void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { +void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { + if(pConsumer == NULL) return; taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); + if(delete){ + taosMemoryFree(pConsumer); + } } int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { @@ -273,7 +277,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, pConsumer->pid); tlen += taosEncodeSEpSet(buf, &pConsumer->ep); - tlen += taosEncodeFixedI64(buf, pConsumer->upTime); + tlen += taosEncodeFixedI64(buf, pConsumer->createTime); tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime); tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime); @@ -339,7 +343,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { buf = taosDecodeFixedI32(buf, &pConsumer->pid); buf = taosDecodeSEpSet(buf, &pConsumer->ep); - buf = taosDecodeFixedI64(buf, &pConsumer->upTime); + buf = taosDecodeFixedI64(buf, &pConsumer->createTime); buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime); buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ed1a602cd3fd08288158e4efe97bd980608203e8..ce3a4ea048c18d185a0397208a7d650fdca02fff 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -160,10 +160,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { -// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { -// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; -// return -1; -// } + if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; + return -1; + } void *buf; int32_t tlen; @@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { taosMemoryFree(buf); - terrno = TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; return -1; } @@ -296,17 +296,17 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } -static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ - for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ - SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = pConsumerEp->consumerId, - .pVgEp = pVgEp, - }; - taosArrayPush(pOutput->rebVgs, &outputVg); - } -} +//static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ +// for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ +// SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); +// SMqRebOutputVg outputVg = { +// .oldConsumerId = pConsumerEp->consumerId, +// .newConsumerId = pConsumerEp->consumerId, +// .pVgEp = pVgEp, +// }; +// taosArrayPush(pOutput->rebVgs, &outputVg); +// } +//} static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { @@ -357,7 +357,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } - putNoTransferToOutput(pOutput, pConsumerEp); +// putNoTransferToOutput(pOutput, pConsumerEp); } } @@ -540,50 +540,44 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu return -1; } + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); + // 3. commit log: consumer to update status and epoch // 3.1 set touched consumer int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); - SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH; - mndReleaseConsumer(pMnode, pConsumerOld); + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); } // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC; - SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__ADD; - char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - taosArrayPush(pConsumerNew->rebNewTopics, &topic); - mndReleaseConsumer(pMnode, pConsumerOld); + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); } // 3.3 set removed consumer @@ -591,24 +585,19 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); - SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; - char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); - mndReleaseConsumer(pMnode, pConsumerOld); + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE; + + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); } // 4. TODO commit log: modification log @@ -762,6 +751,20 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { return -1; } + void *pIter = NULL; + SMqConsumerObj *pConsumer; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) { + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + } + sdbRelease(pMnode->pSdb, pConsumer); + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); @@ -1107,8 +1110,12 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); // consumer id + char consumerIdHex[32] = {0}; + sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumerEp->consumerId); + varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false); mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 4d12f2433fc0b37c3407a5f2cf08fe601b86bcab..551b08d2be0d0bcc7aaff6fbeaacf30a61b45d16 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { SMqTopicObj *pTopic = NULL; SDbObj *pDb = NULL; SCMCreateTopicReq createTopicReq = {0}; + if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){ + terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE; + mError("topic num out of range"); + return code; + } if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -697,7 +702,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { break; } - if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndReleaseConsumer(pMnode, pConsumer); + continue; + } int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e4a00c1fc95e6e7c0143fbfecea07ec5a97158ae..e6ffec85ec0def87d1630251e4db5aefc0763634 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") -TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")