diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 876859216f26e8c15c50c69d1ff74175eec69186..d4e63e9676d519c0ae92a0d93a7199f780c58eee 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -28,7 +28,7 @@ extern "C" { #include "taosdef.h" typedef enum EntryType { - SYNC_RAFT_ENTRY_NULL = 0, + SYNC_RAFT_ENTRY_NOOP = 0, SYNC_RAFT_ENTRY_DATA = 1, SYNC_RAFT_ENTRY_CONFIG = 2, } EntryType; @@ -49,6 +49,7 @@ typedef struct SSyncRaftEntry { SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType); +SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index); void syncEntryDestory(SSyncRaftEntry* pEntry); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1116c1a9050753f1d3e8d47e4b0b343081b3f2d5..0a83e09605a962d621b6df6f1b784b092f227974 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -198,6 +198,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (ths->pFsm->FpRollBackCb != NULL) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); + assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA); SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -217,7 +218,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } @@ -242,7 +243,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); } } @@ -298,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm->FpCommitCb != NULL) { + if (ths->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f581b31c4b34dbd733a207d40fda67ede825d1f5..6023c00afa183716c06828470a1e91e0fdec087b 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -97,7 +97,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (pSyncNode->pFsm->FpCommitCb != NULL) { + if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f2341ef521f8516c4c56186a1fa9a70f136929b2..3080cbcd0d381c722096e198e2c0d3f8d905c997 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -41,10 +41,11 @@ static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); -// on message ---- +// process message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); +static int32_t syncNodeAppendNoop(SSyncNode* ths); // life cycle static void syncFreeNode(void* param); @@ -669,6 +670,9 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); assert(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode); + + // Raft 3.6.2 Committing entries from previous terms + syncNodeAppendNoop(pSyncNode); } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { @@ -851,7 +855,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } } @@ -866,7 +870,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state); } } @@ -877,6 +881,22 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg return ret; } +static int32_t syncNodeAppendNoop(SSyncNode* ths) { + int32_t ret = 0; + + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index); + assert(pEntry != NULL); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncNodeReplicate(ths); + } + + return ret; +} + static void syncFreeNode(void* param) { SSyncNode* pNode = param; syncNodePrint2((char*)"==syncFreeNode==", pNode); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 30ffebbc3204ed279cdc700e47b4d1e1d5c77fd0..9b9842b81d5c8cc9dd2a64dc0ba3ff2634907fa4 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -51,6 +51,16 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde return pEntry; } +SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(0); + assert(pEntry != NULL); + pEntry->term = term; + pEntry->index = index; + pEntry->entryType = SYNC_RAFT_ENTRY_NOOP; + + return pEntry; +} + void syncEntryDestory(SSyncRaftEntry* pEntry) { if (pEntry != NULL) { free(pEntry); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index f569c5d6216a3606ce3a9218246b35bb7da0739b..cad2d5da41c56bfed8e0f4b86a38e92892aed586 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -34,7 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->getLastTerm = logStoreLastTerm; pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; pLogStore->getCommitIndex = logStoreGetCommitIndex; - return pLogStore; // to avoid compiler error + return pLogStore; } void logStoreDestory(SSyncLogStore* pLogStore) { @@ -54,12 +54,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { assert(serialized != NULL); int code; - code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); + code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len); assert(code == 0); walFsync(pWal, true); free(serialized); - return code; // to avoid compiler error + return code; } SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index 7f211276f866ed974b6b9b455f366434f759b259..4acdb6052ea9ef273ae8d0715589694a580b223f 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -52,7 +52,7 @@ void test3() { pSyncMsg->isWeak = 1; strcpy(pSyncMsg->data, "test3"); - SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NULL); + SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NOOP); syncEntryPrint(pEntry); syncClientRequestDestroy(pSyncMsg);