From e58812aab5df79c724ba647d587096c0bfcf9211 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 24 Mar 2022 11:40:36 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncRaftEntry.h | 3 ++- source/libs/sync/src/syncAppendEntries.c | 7 ++++--- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncMain.c | 26 +++++++++++++++++++++--- source/libs/sync/src/syncRaftEntry.c | 10 +++++++++ source/libs/sync/src/syncRaftLog.c | 6 +++--- source/libs/sync/test/syncEntryTest.cpp | 2 +- 7 files changed, 44 insertions(+), 12 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 876859216f..d4e63e9676 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -28,7 +28,7 @@ extern "C" { #include "taosdef.h" typedef enum EntryType { - SYNC_RAFT_ENTRY_NULL = 0, + SYNC_RAFT_ENTRY_NOOP = 0, SYNC_RAFT_ENTRY_DATA = 1, SYNC_RAFT_ENTRY_CONFIG = 2, } EntryType; @@ -49,6 +49,7 @@ typedef struct SSyncRaftEntry { SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType); +SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index); void syncEntryDestory(SSyncRaftEntry* pEntry); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1116c1a905..0a83e09605 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -198,6 +198,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (ths->pFsm->FpRollBackCb != NULL) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); + assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA); SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -217,7 +218,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } @@ -242,7 +243,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); } } @@ -298,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm->FpCommitCb != NULL) { + if (ths->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f581b31c4b..6023c00afa 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -97,7 +97,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (pSyncNode->pFsm->FpCommitCb != NULL) { + if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f2341ef521..3080cbcd0d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -41,10 +41,11 @@ static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); -// on message ---- +// process message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); +static int32_t syncNodeAppendNoop(SSyncNode* ths); // life cycle static void syncFreeNode(void* param); @@ -669,6 +670,9 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); assert(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode); + + // Raft 3.6.2 Committing entries from previous terms + syncNodeAppendNoop(pSyncNode); } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { @@ -851,7 +855,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } } @@ -866,7 +870,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state); } } @@ -877,6 +881,22 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg return ret; } +static int32_t syncNodeAppendNoop(SSyncNode* ths) { + int32_t ret = 0; + + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index); + assert(pEntry != NULL); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncNodeReplicate(ths); + } + + return ret; +} + static void syncFreeNode(void* param) { SSyncNode* pNode = param; syncNodePrint2((char*)"==syncFreeNode==", pNode); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 30ffebbc32..9b9842b81d 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -51,6 +51,16 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde return pEntry; } +SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(0); + assert(pEntry != NULL); + pEntry->term = term; + pEntry->index = index; + pEntry->entryType = SYNC_RAFT_ENTRY_NOOP; + + return pEntry; +} + void syncEntryDestory(SSyncRaftEntry* pEntry) { if (pEntry != NULL) { free(pEntry); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index f569c5d621..cad2d5da41 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -34,7 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->getLastTerm = logStoreLastTerm; pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; pLogStore->getCommitIndex = logStoreGetCommitIndex; - return pLogStore; // to avoid compiler error + return pLogStore; } void logStoreDestory(SSyncLogStore* pLogStore) { @@ -54,12 +54,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { assert(serialized != NULL); int code; - code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); + code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len); assert(code == 0); walFsync(pWal, true); free(serialized); - return code; // to avoid compiler error + return code; } SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index 7f211276f8..4acdb6052e 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -52,7 +52,7 @@ void test3() { pSyncMsg->isWeak = 1; strcpy(pSyncMsg->data, "test3"); - SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NULL); + SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NOOP); syncEntryPrint(pEntry); syncClientRequestDestroy(pSyncMsg); -- GitLab