From ef61d3ee947d6609c43fcadcada24554e8b8df63 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 18 Mar 2022 11:47:46 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncCommit.h | 25 ++++++++++++ source/libs/sync/src/syncCommit.c | 64 ++++++++++++++++++++++++------- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h index 4b327935c0..c76236d5bf 100644 --- a/source/libs/sync/inc/syncCommit.h +++ b/source/libs/sync/inc/syncCommit.h @@ -26,7 +26,32 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +// \* Leader i advances its commitIndex. +// \* This is done as a separate step from handling AppendEntries responses, +// \* in part to minimize atomic regions, and in part so that leaders of +// \* single-server clusters are able to mark entries committed. +// AdvanceCommitIndex(i) == +// /\ state[i] = Leader +// /\ LET \* The set of servers that agree up through index. +// Agree(index) == {i} \cup {k \in Server : +// matchIndex[i][k] >= index} +// \* The maximum indexes for which a quorum agrees +// agreeIndexes == {index \in 1..Len(log[i]) : +// Agree(index) \in Quorum} +// \* New value for commitIndex'[i] +// newCommitIndex == +// IF /\ agreeIndexes /= {} +// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] +// THEN +// Max(agreeIndexes) +// ELSE +// commitIndex[i] +// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] +// /\ UNCHANGED <> +// void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); +bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0b27f35220..539bb21b71 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -46,25 +46,61 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); // update commit index + SyncIndex newCommitIndex = pSyncNode->commitIndex; + for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; + ++index) { + if (syncAgree(pSyncNode, index)) { + newCommitIndex = index; + break; + } + } - if (pSyncNode->pFsm != NULL) { - SyncIndex beginIndex = SYNC_INDEX_INVALID; - SyncIndex endIndex = SYNC_INDEX_INVALID; - for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - if (i != SYNC_INDEX_INVALID) { - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); - assert(pEntry != NULL); + if (newCommitIndex > pSyncNode->commitIndex) { + SyncIndex beginIndex = pSyncNode->commitIndex + 1; + SyncIndex endIndex = newCommitIndex; + pSyncNode->commitIndex = newCommitIndex; - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + if (pSyncNode->pFsm != NULL) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); + assert(pEntry != NULL); - if (pSyncNode->pFsm->FpCommitCb != NULL) { - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); - } + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); + if (pSyncNode->pFsm->FpCommitCb != NULL) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } } } } +} + +bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { + SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId); + + // b for debug + bool b = false; + if (matchIndex >= index) { + b = true; + } + return b; +} + +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { + int agreeCount = 0; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + ++agreeCount; + } + if (agreeCount >= pSyncNode->quorum) { + return true; + } + } + return false; } \ No newline at end of file -- GitLab