提交 10478112 编写于 作者: B Benguang Zhao

fix: update commit index from heartbeat on learner in the same way as on follower

上级 03ffbdd4
...@@ -2414,18 +2414,19 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2414,18 +2414,19 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; pSyncMsg->cmd =
(ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->commitIndex = pMsg->commitIndex; pSyncMsg->commitIndex = pMsg->commitIndex;
pSyncMsg->currentTerm = pMsg->term; pSyncMsg->currentTerm = pMsg->term;
SyncIndex fcIndex = pSyncMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code); sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
rpcFreeCont(rpcMsgLocalCmd.pCont); rpcFreeCont(rpcMsgLocalCmd.pCont);
} else { } else {
sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex); sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
pMsg->commitIndex, pMsg->term);
} }
} }
} }
...@@ -2451,26 +2452,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2451,26 +2452,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
} }
if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_LEARNER_CMT;
pSyncMsg->currentTerm = pMsg->term;
pSyncMsg->commitIndex = pMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
}
}
}
// reply // reply
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
...@@ -2521,7 +2502,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2521,7 +2502,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->currentTerm); syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
if (syncLogBufferIsEmpty(ths->pLogBuf)) { if (syncLogBufferIsEmpty(ths->pLogBuf)) {
sError("vgId:%d, sync log buffer is empty.", ths->vgId); sError("vgId:%d, sync log buffer is empty.", ths->vgId);
return 0; return 0;
...@@ -2534,20 +2515,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2534,20 +2515,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex); ths->commitIndex);
} }
} else if (pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT){ } else {
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
sError("vgId:%d, sync log buffer is empty.", ths->vgId);
return 0;
}
raftStoreSetTerm(ths, pMsg->currentTerm);
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
sTrace("vgId:%d, start to commit raft log in heartbeat. commit index:%" PRId64 "", ths->vgId, ths->commitIndex);
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex);
}
}
else {
sError("error local cmd"); sError("error local cmd");
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册