提交 79342cf1 编写于 作者: H Haojun Liao

refactor(tqm): do some internal refactor.

上级 3b814f24
......@@ -23,13 +23,12 @@ extern "C" {
#endif
enum {
MQ_CONSUMER_STATUS__MODIFY = 1,
MQ_CONSUMER_STATUS_REBALANCE = 1,
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST,
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__LOST_REBD,
MQ_CONSUMER_STATUS__REMOVED,
};
int32_t mndInitConsumer(SMnode *pMnode);
......
......@@ -142,7 +142,7 @@ typedef enum {
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic
CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic
} ECsmUpdateType;
typedef struct {
......
......@@ -192,6 +192,7 @@ FAIL:
return -1;
}
// todo check the clear process
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
......@@ -302,11 +303,10 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
.pCont = pLostMsg,
.contLen = sizeof(SMqConsumerLostMsg),
};
.msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
MND_CONSUMER_LOST_HB_CNT);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
......@@ -316,11 +316,10 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
pClearMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
.pCont = pClearMsg,
.contLen = sizeof(SMqConsumerClearMsg),
};
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId,
MND_CONSUMER_LOST_CLEAR_THRESHOLD);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} else if (status == MQ_CONSUMER_STATUS__LOST) {
......@@ -334,7 +333,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
} else {
} else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
......@@ -660,7 +659,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
......@@ -691,7 +690,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
......@@ -870,9 +869,10 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY) {
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST) {
ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0);
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
}
......@@ -913,21 +913,13 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
SArray *tmp = pOldConsumer->rebNewTopics;
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
tmp = pOldConsumer->rebRemovedTopics;
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
pNewConsumer->rebRemovedTopics = tmp;
tmp = pOldConsumer->assignedTopics;
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
pNewConsumer->assignedTopics = tmp;
if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) {
TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
......@@ -937,10 +929,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
int32_t status = pOldConsumer->status;
int32_t prevStatus = pOldConsumer->status;
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
......@@ -950,8 +942,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
}
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
......@@ -1160,7 +1151,7 @@ static const char *mndConsumerStatusName(int status) {
case MQ_CONSUMER_STATUS__LOST:
case MQ_CONSUMER_STATUS__LOST_REBD:
return "lost";
case MQ_CONSUMER_STATUS__MODIFY:
case MQ_CONSUMER_STATUS_REBALANCE:
return "rebalancing";
default:
return "unknown";
......
......@@ -225,7 +225,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
pConsumer->epoch = 0;
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
pConsumer->hbStatus = 0;
taosInitRWLatch(&pConsumer->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册