提交 5de46007 编写于 作者: wmmhello's avatar wmmhello

fix:remove lost_reb status

上级 a7a518aa
......@@ -145,7 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
......
......@@ -25,11 +25,11 @@ extern "C" {
enum {
MQ_CONSUMER_STATUS_REBALANCE = 1,
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS_READY,
MQ_CONSUMER_STATUS_LOST,
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__LOST_REBD,
};
// MQ_CONSUMER_STATUS__LOST_REBD,
};\
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
......
......@@ -140,7 +140,7 @@ typedef enum {
CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance
CONSUMER_UPDATE_TIMER_LOST,
// CONSUMER_UPDATE_TIMER_LOST,
CONSUMER_UPDATE_RECOVER,
CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req
} ECsmUpdateType;
......
......@@ -63,7 +63,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
......@@ -105,48 +105,48 @@ void mndRebCntDec() {
}
}
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
if (pConsumer == NULL) {
return 0;
}
mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
if (pTrans == NULL) {
goto FAIL;
}
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
goto FAIL;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
goto FAIL;
}
tDeleteSMqConsumerObj(pConsumerNew, true);
mndTransDrop(pTrans);
return 0;
FAIL:
tDeleteSMqConsumerObj(pConsumerNew, true);
mndTransDrop(pTrans);
return -1;
}
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
// SMnode *pMnode = pMsg->info.node;
// SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
// if (pConsumer == NULL) {
// return 0;
// }
//
// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
// mndConsumerStatusName(pConsumer->status));
//
// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
// mndReleaseConsumer(pMnode, pConsumer);
// return -1;
// }
//
// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
// mndReleaseConsumer(pMnode, pConsumer);
//
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
// if (pTrans == NULL) {
// goto FAIL;
// }
//
// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
// goto FAIL;
// }
//
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// goto FAIL;
// }
//
// tDeleteSMqConsumerObj(pConsumerNew, true);
// mndTransDrop(pTrans);
// return 0;
//FAIL:
// tDeleteSMqConsumerObj(pConsumerNew, true);
// mndTransDrop(pTrans);
// return -1;
//}
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
......@@ -160,7 +160,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
pConsumer->status, mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST) {
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1;
......@@ -204,7 +204,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
......@@ -299,26 +299,37 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus);
if (status == MQ_CONSUMER_STATUS__READY) {
if (status == MQ_CONSUMER_STATUS_READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
if (pLostMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
continue;
// SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
// if (pLostMsg == NULL) {
// mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
// pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
// continue;
// }
//
// pLostMsg->consumerId = pConsumer->consumerId;
// SRpcMsg rpcMsg = {
// .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
//
// mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
// MND_CONSUMER_LOST_HB_CNT);
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
MND_CONSUMER_LOST_HB_CNT);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
taosRUnLockLatch(&pConsumer->lock);
}
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
} else if (status == MQ_CONSUMER_STATUS_LOST) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d",
......@@ -334,17 +345,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
MND_CONSUMER_LOST_CLEAR_THRESHOLD);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} else if (status == MQ_CONSUMER_STATUS__LOST) {
taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
} else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock);
......@@ -409,7 +409,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST) {
if (status == MQ_CONSUMER_STATUS_LOST) {
mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
......@@ -460,7 +460,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST) {
if (status == MQ_CONSUMER_STATUS_LOST) {
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
......@@ -474,7 +474,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
}
if (status != MQ_CONSUMER_STATUS__READY) {
if (status != MQ_CONSUMER_STATUS_READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1;
......@@ -692,7 +692,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
" cgroup:%s, current status:%d(%s), subscribe topic num: %d",
consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
if (status != MQ_CONSUMER_STATUS__READY) {
if (status != MQ_CONSUMER_STATUS_READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto _over;
}
......@@ -881,9 +881,9 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
pConsumer->status = MQ_CONSUMER_STATUS_READY;
} else if (status == MQ_CONSUMER_STATUS_READY) {
pConsumer->status = MQ_CONSUMER_STATUS_LOST;
}
}
}
......@@ -964,18 +964,18 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = taosGetTimestampMs();
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
}
int32_t prevStatus = pOldConsumer->status;
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
// int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
// for (int32_t i = 0; i < sz; i++) {
// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
// taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
// }
//
// int32_t prevStatus = pOldConsumer->status;
// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
// pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
......@@ -1176,10 +1176,9 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
static const char *mndConsumerStatusName(int status) {
switch (status) {
case MQ_CONSUMER_STATUS__READY:
case MQ_CONSUMER_STATUS_READY:
return "ready";
case MQ_CONSUMER_STATUS__LOST:
case MQ_CONSUMER_STATUS__LOST_REBD:
case MQ_CONSUMER_STATUS_LOST:
return "lost";
case MQ_CONSUMER_STATUS_REBALANCE:
return "rebalancing";
......
......@@ -724,6 +724,23 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
return -1;
}
void *pIter = NULL;
SMqConsumerObj *pConsumer;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
sdbRelease(pMnode->pSdb, pConsumer);
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
sdbRelease(pMnode->pSdb, pConsumer);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
......
......@@ -697,8 +697,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
break;
}
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册