提交 0c4ade93 编写于 作者: B Benguang Zhao

fix: update sync node commitIndex only if matchTerm equals currentTerm upon heartbeat

上级 c4fad84c
......@@ -247,8 +247,8 @@ typedef struct SyncLocalCmd {
SRaftId destId;
int32_t cmd;
SyncTerm sdNewTerm; // step down new term
SyncIndex fcIndex; // follower commit index
SyncTerm currentTerm; // step down new term
SyncIndex commitIndex; // follower commit index
} SyncLocalCmd;
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
......
......@@ -98,6 +98,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
......
......@@ -90,6 +90,7 @@
//
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
ASSERT(false && "deprecated");
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
sNTrace(ths, "can not do follower commit");
return -1;
......
......@@ -44,6 +44,7 @@
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
ASSERT(false && "deprecated");
if (pSyncNode == NULL) {
sError("pSyncNode is NULL");
return;
......
......@@ -1036,6 +1036,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
}
}
pSyncNode->commitIndex = commitIndex;
sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
goto _error;
......@@ -1176,9 +1177,10 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
}
ASSERT(endIndex == lastVer + 1);
commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
return -1;
}
......@@ -2545,8 +2547,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->fcIndex = pMsg->commitIndex;
SyncIndex fcIndex = pSyncMsg->fcIndex;
pSyncMsg->commitIndex = pMsg->commitIndex;
pSyncMsg->currentTerm = pMsg->term;
SyncIndex fcIndex = pSyncMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
......@@ -2567,7 +2570,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
pSyncMsg->sdNewTerm = pMsg->term;
pSyncMsg->currentTerm = pMsg->term;
pSyncMsg->commitIndex = pMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
......@@ -2575,7 +2579,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->currentTerm);
}
}
}
......@@ -2633,10 +2637,13 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex);
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
}
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex);
......@@ -2649,14 +2656,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(false && "deprecated");
SyncLocalCmd* pMsg = pRpcMsg->pCont;
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
syncNodeFollowerCommit(ths, pMsg->fcIndex);
syncNodeFollowerCommit(ths, pMsg->commitIndex);
} else {
sError("error local cmd");
......
......@@ -265,20 +265,27 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
return ret;
}
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
SyncIndex index = pBuf->matchIndex;
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
return pEntry->term;
}
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex);
SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return term;
}
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
int32_t ret = -1;
SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
SSyncRaftEntry* pExist = NULL;
bool inBuf = true;
......
......@@ -411,7 +411,7 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
if (!(sDebugFlag & DEBUG_TRACE)) return;
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
......
......@@ -295,6 +295,7 @@ void walAlignVersions(SWal* pWal) {
// reset commitVer and appliedVer
pWal->vers.commitVer = pWal->vers.snapshotVer;
pWal->vers.appliedVer = pWal->vers.snapshotVer;
wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
}
bool walLogEntriesComplete(const SWal* pWal) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册