未验证 提交 c3bc54c9 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #21039 from taosdata/FIX/TD-23824-3.0

fix: update commit index from heartbeat on  learner in the same way as on follower
......@@ -2414,18 +2414,19 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->cmd =
(ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->commitIndex = pMsg->commitIndex;
pSyncMsg->currentTerm = pMsg->term;
SyncIndex fcIndex = pSyncMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
pMsg->commitIndex, pMsg->term);
}
}
}
......@@ -2451,26 +2452,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
}
if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_LEARNER_CMT;
pSyncMsg->currentTerm = pMsg->term;
pSyncMsg->commitIndex = pMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
}
}
}
// reply
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
......@@ -2521,7 +2502,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
sError("vgId:%d, sync log buffer is empty.", ths->vgId);
return 0;
......@@ -2534,20 +2515,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex);
}
} else if (pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT){
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
sError("vgId:%d, sync log buffer is empty.", ths->vgId);
return 0;
}
raftStoreSetTerm(ths, pMsg->currentTerm);
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
sTrace("vgId:%d, start to commit raft log in heartbeat. commit index:%" PRId64 "", ths->vgId, ths->commitIndex);
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex);
}
}
else {
} else {
sError("error local cmd");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册