diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3bd94dbab5811fc071d7444ab864d3087f785d53..49486bc12dbc8cea7318def97a12cc160d6ed42a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -247,8 +247,8 @@ typedef struct SyncLocalCmd { SRaftId destId; int32_t cmd; - SyncTerm sdNewTerm; // step down new term - SyncIndex fcIndex; // follower commit index + SyncTerm currentTerm; // step down new term + SyncIndex commitIndex; // follower commit index } SyncLocalCmd; int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index a0a0691694776f2830d4f80ec2e96ec8bf782921..55cb0d7db685304053fdafff092a8206a12a7d73 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -98,6 +98,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); // access int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); +SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1e5adb4bed1aa711e37d88435d82f51765711aa8..66ff28d07d36d2be693c3953e8dbd7fd6cd9f33f 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -90,6 +90,7 @@ // int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { + ASSERT(false && "deprecated"); if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { sNTrace(ths, "can not do follower commit"); return -1; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5fdcbeb91c119bfebb21805b12fc1faa15eb2cab..152fddb7e68127bac57947faf04a7ac8b907ff20 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -44,6 +44,7 @@ // /\ UNCHANGED <> // void syncOneReplicaAdvance(SSyncNode* pSyncNode) { + ASSERT(false && "deprecated"); if (pSyncNode == NULL) { sError("pSyncNode is NULL"); return; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1a481a7e1452b852c44d655a33c3fdb06b79fc1a..7a6c0f734f968b7f69cabf4e5f387419a5ffeb8d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1036,6 +1036,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { } } pSyncNode->commitIndex = commitIndex; + sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) { goto _error; @@ -1176,9 +1177,10 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { } ASSERT(endIndex == lastVer + 1); - commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); + pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); + sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); - if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) { + if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) { return -1; } @@ -2545,8 +2547,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; - pSyncMsg->fcIndex = pMsg->commitIndex; - SyncIndex fcIndex = pSyncMsg->fcIndex; + pSyncMsg->commitIndex = pMsg->commitIndex; + pSyncMsg->currentTerm = pMsg->term; + SyncIndex fcIndex = pSyncMsg->commitIndex; if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); @@ -2567,7 +2570,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; - pSyncMsg->sdNewTerm = pMsg->term; + pSyncMsg->currentTerm = pMsg->term; + pSyncMsg->commitIndex = pMsg->commitIndex; if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); @@ -2575,7 +2579,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm); + sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->currentTerm); } } } @@ -2633,10 +2637,13 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvLocalCmd(ths, pMsg, ""); if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { - syncNodeStepDown(ths, pMsg->sdNewTerm); + syncNodeStepDown(ths, pMsg->currentTerm); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { - (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex); + SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); + if (pMsg->currentTerm == matchTerm) { + (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); + } if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(), ths->commitIndex); @@ -2649,14 +2656,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + ASSERT(false && "deprecated"); SyncLocalCmd* pMsg = pRpcMsg->pCont; syncLogRecvLocalCmd(ths, pMsg, ""); if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { - syncNodeStepDown(ths, pMsg->sdNewTerm); + syncNodeStepDown(ths, pMsg->currentTerm); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { - syncNodeFollowerCommit(ths, pMsg->fcIndex); + syncNodeFollowerCommit(ths, pMsg->commitIndex); } else { sError("error local cmd"); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d88e6103729663537b9e5b9ea82081b6e8f9f4a5..de9cd6e1a63fce679819d69ee00d512a2bc92d03 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -265,20 +265,27 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { return ret; } -FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { +FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) { SyncIndex index = pBuf->matchIndex; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; ASSERT(pEntry != NULL); return pEntry->term; } +SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { + taosThreadMutexLock(&pBuf->mutex); + SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return term; +} + int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); int32_t ret = -1; SyncIndex index = pEntry->index; SyncIndex prevIndex = pEntry->index - 1; - SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); + SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf); SSyncRaftEntry* pExist = NULL; bool inBuf = true; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 49a24bebde3774640c2636e7964a2c18e3a1266f..525681e53e7f94a2453e756bc7969bd255a7c0a1 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -411,7 +411,7 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c if (!(sDebugFlag & DEBUG_TRACE)) return; sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, - syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); + syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s); } void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index aeb0fe9fe9851b7687e153666ede2fc821d1aadc..44e88a4dcc425a728789a5e95b9fbf19bcaae415 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -295,6 +295,7 @@ void walAlignVersions(SWal* pWal) { // reset commitVer and appliedVer pWal->vers.commitVer = pWal->vers.snapshotVer; pWal->vers.appliedVer = pWal->vers.snapshotVer; + wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } bool walLogEntriesComplete(const SWal* pWal) {