diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8a0a6be8fdcb62ca4750b36065c8f95b32b23a79..e3342d76ee8d986d92b5c88a47da7e5a518f80ae 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2414,18 +2414,19 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; - pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; + pSyncMsg->cmd = + (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT; pSyncMsg->commitIndex = pMsg->commitIndex; pSyncMsg->currentTerm = pMsg->term; - SyncIndex fcIndex = pSyncMsg->commitIndex; if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); if (code != 0) { - sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code); + sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex); + sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId, + pMsg->commitIndex, pMsg->term); } } } @@ -2451,26 +2452,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } } - if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) { - SRpcMsg rpcMsgLocalCmd = {0}; - (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); - - SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; - pSyncMsg->cmd = SYNC_LOCAL_CMD_LEARNER_CMT; - pSyncMsg->currentTerm = pMsg->term; - pSyncMsg->commitIndex = pMsg->commitIndex; - - if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { - int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); - if (code != 0) { - sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); - rpcFreeCont(rpcMsgLocalCmd.pCont); - } else { - sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term); - } - } - } - // reply syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); @@ -2521,7 +2502,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->currentTerm); - } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) { if (syncLogBufferIsEmpty(ths->pLogBuf)) { sError("vgId:%d, sync log buffer is empty.", ths->vgId); return 0; @@ -2534,20 +2515,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), ths->commitIndex); } - } else if (pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT){ - if (syncLogBufferIsEmpty(ths->pLogBuf)) { - sError("vgId:%d, sync log buffer is empty.", ths->vgId); - return 0; - } - raftStoreSetTerm(ths, pMsg->currentTerm); - (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); - sTrace("vgId:%d, start to commit raft log in heartbeat. commit index:%" PRId64 "", ths->vgId, ths->commitIndex); - if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { - sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), - ths->commitIndex); - } - } - else { + } else { sError("error local cmd"); }