From e5b0e1465d47e783a7a0a22a05ba78d98cf9acac Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 17 Mar 2022 16:33:10 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/src/syncCommit.c | 22 ++++++++++++++++++++++ source/libs/sync/src/syncMain.c | 14 ++++++++------ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 850468f393..25835edc92 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -15,6 +15,7 @@ #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncRaftLog.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -42,4 +43,25 @@ void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); + + // update commit index + + if (pSyncNode->pFsm != NULL) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + SyncIndex endIndex = SYNC_INDEX_INVALID; + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (pSyncNode->pFsm->FpCommitCb != NULL) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c716a17df8..1f3e709a27 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -749,9 +749,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - assert(ths->pFsm != NULL); - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } } rpcFreeCont(rpcMsg.pCont); @@ -760,9 +761,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - assert(ths->pFsm != NULL); - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + } } rpcFreeCont(rpcMsg.pCont); } -- GitLab