diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 850468f393a6b1ba0e2b3aab6a9dd23eaeaeb4f3..25835edc926a3b5b7f1b176860d5984a47291be7 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -15,6 +15,7 @@ #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncRaftLog.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -42,4 +43,25 @@ void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); + + // update commit index + + if (pSyncNode->pFsm != NULL) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + SyncIndex endIndex = SYNC_INDEX_INVALID; + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (pSyncNode->pFsm->FpCommitCb != NULL) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c716a17df8839ca9b6202271b567c0f9f0fa1c14..1f3e709a27e7001e71f064b75d718dfa8f0b0bc9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -749,9 +749,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - assert(ths->pFsm != NULL); - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } } rpcFreeCont(rpcMsg.pCont); @@ -760,9 +761,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - assert(ths->pFsm != NULL); - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + } } rpcFreeCont(rpcMsg.pCont); }