From 2aeda3a94171340ab5b8fc3ca0b58e6db59b0813 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 Jan 2023 12:01:36 +0800 Subject: [PATCH] enh: refact raft store file --- source/libs/sync/inc/syncInt.h | 17 +- source/libs/sync/inc/syncRaftStore.h | 29 +- source/libs/sync/src/syncAppendEntries.c | 12 +- source/libs/sync/src/syncAppendEntriesReply.c | 12 +- source/libs/sync/src/syncCommit.c | 4 +- source/libs/sync/src/syncElection.c | 10 +- source/libs/sync/src/syncMain.c | 91 +++---- source/libs/sync/src/syncMessage.c | 2 +- source/libs/sync/src/syncPipeline.c | 46 ++-- source/libs/sync/src/syncRaftStore.c | 247 +++++++++--------- source/libs/sync/src/syncReplication.c | 4 +- source/libs/sync/src/syncRequestVote.c | 34 +-- source/libs/sync/src/syncRequestVoteReply.c | 10 +- source/libs/sync/src/syncRespMgr.c | 2 +- source/libs/sync/src/syncSnapshot.c | 40 +-- source/libs/sync/src/syncUtil.c | 12 +- .../test/sync_test_lib/src/syncMainDebug.c | 4 +- .../sync_test_lib/src/syncSnapshotDebug.c | 2 +- 18 files changed, 281 insertions(+), 297 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 6793430923..7e08e195c1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -32,11 +32,9 @@ typedef struct SyncRequestVoteReply SyncRequestVoteReply; typedef struct SyncAppendEntries SyncAppendEntries; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; typedef struct SSyncEnv SSyncEnv; -typedef struct SRaftStore SRaftStore; typedef struct SVotesGranted SVotesGranted; typedef struct SVotesRespond SVotesRespond; typedef struct SSyncIndexMgr SSyncIndexMgr; -typedef struct SRaftCfg SRaftCfg; typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; @@ -70,6 +68,11 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; +typedef struct SRaftStore { + SyncTerm currentTerm; + SRaftId voteFor; +} SRaftStore; + typedef struct SSyncHbTimerData { int64_t syncNodeRid; SSyncTimer* pTimer; @@ -112,8 +115,8 @@ typedef struct SSyncNode { // sync io SSyncLogBuffer* pLogBuf; - SWal* pWal; - const SMsgCb* msgcb; + SWal* pWal; + const SMsgCb* msgcb; int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); @@ -139,8 +142,8 @@ typedef struct SSyncNode { int64_t rid; // tla+ server vars - ESyncState state; - SRaftStore* pRaftStore; + ESyncState state; + SRaftStore raftStore; // tla+ candidate vars SVotesGranted* pVotesGranted; @@ -229,7 +232,7 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); void syncNodePostClose(SSyncNode* pSyncNode); -int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); +int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncNodeRestore(SSyncNode* pSyncNode); void syncHbTimerDataFree(SSyncHbTimerData* pData); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index bb6405f6b2..21a8fc64a8 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -24,27 +24,16 @@ extern "C" { #define RAFT_STORE_BLOCK_SIZE 512 #define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2) +#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) -#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) - -typedef struct SRaftStore { - SyncTerm currentTerm; - SRaftId voteFor; - TdFilePtr pFile; - char path[RAFT_STORE_PATH_LEN]; -} SRaftStore; - -SRaftStore *raftStoreOpen(const char *path); -int32_t raftStoreClose(SRaftStore *pRaftStore); -int32_t raftStorePersist(SRaftStore *pRaftStore); -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); - -bool raftStoreHasVoted(SRaftStore *pRaftStore); -void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); -void raftStoreClearVote(SRaftStore *pRaftStore); -void raftStoreNextTerm(SRaftStore *pRaftStore); -void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); +int32_t raftStoreReadFile(SSyncNode *pNode); +int32_t raftStoreWriteFile(SSyncNode *pNode); + +bool raftStoreHasVoted(SSyncNode *pNode); +void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId); +void raftStoreClearVote(SSyncNode *pNode); +void raftStoreNextTerm(SSyncNode *pNode); +void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 026ebdb37c..83d1777f44 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -159,17 +159,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // prepare response msg pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; + pReply->term = ths->raftStore.currentTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->startTime = ths->startTime; - if (pMsg->term < ths->pRaftStore->currentTerm) { + if (pMsg->term < ths->raftStore.currentTerm) { goto _SEND_RESPONSE; } - if (pMsg->term > ths->pRaftStore->currentTerm) { + if (pMsg->term > ths->raftStore.currentTerm) { pReply->term = pMsg->term; } @@ -253,19 +253,19 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntriesReply* pReply = rpcRsp.pCont; pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; + pReply->term = ths->raftStore.currentTerm; pReply->success = false; // pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); pReply->matchIndex = SYNC_INDEX_INVALID; pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->startTime = ths->startTime; - if (pMsg->term < ths->pRaftStore->currentTerm) { + if (pMsg->term < ths->raftStore.currentTerm) { syncLogRecvAppendEntries(ths, pMsg, "reject, small term"); goto _SEND_RESPONSE; } - if (pMsg->term > ths->pRaftStore->currentTerm) { + if (pMsg->term > ths->raftStore.currentTerm) { pReply->term = pMsg->term; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index b83be2bebb..8157a5a14f 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { + if (pMsg->term < ths->raftStore.currentTerm) { syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); return 0; } if (ths->state == TAOS_SYNC_STATE_LEADER) { - if (pMsg->term > ths->pRaftStore->currentTerm) { + if (pMsg->term > ths->raftStore.currentTerm) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return -1; } - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->raftStore.currentTerm); sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); @@ -100,19 +100,19 @@ int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* } // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { + if (pMsg->term < ths->raftStore.currentTerm) { syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); return 0; } if (ths->state == TAOS_SYNC_STATE_LEADER) { - if (pMsg->term > ths->pRaftStore->currentTerm) { + if (pMsg->term > ths->raftStore.currentTerm) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return -1; } - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->raftStore.currentTerm); if (pMsg->success) { SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 152fddb7e6..286cf4daf5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -133,7 +133,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } // cannot commit, even if quorum agree. need check term! - if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { + if (pEntry->term <= pSyncNode->raftStore.currentTerm) { // update commit index newCommitIndex = index; @@ -329,7 +329,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { SyncIndex commitIndex = indexLikely; syncNodeUpdateCommitIndex(ths, commitIndex); sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, - ths->pRaftStore->currentTerm, commitIndex); + ths->raftStore.currentTerm, commitIndex); } return ths->commitIndex; } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index bcc95c5f10..cd3ffc33e3 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -48,7 +48,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { SyncRequestVote* pMsg = rpcMsg.pCont; pMsg->srcId = pNode->myRaftId; pMsg->destId = pNode->peersId[i]; - pMsg->term = pNode->pRaftStore->currentTerm; + pMsg->term = pNode->raftStore.currentTerm; ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); ASSERT(ret == 0); @@ -75,10 +75,10 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { } // start election - raftStoreNextTerm(pSyncNode->pRaftStore); - raftStoreClearVote(pSyncNode->pRaftStore); - voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->pRaftStore->currentTerm); - votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm); + raftStoreNextTerm(pSyncNode); + raftStoreClearVote(pSyncNode); + voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->raftStore.currentTerm); + votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->raftStore.currentTerm); syncNodeVoteForSelf(pSyncNode); if (voteGrantedMajority(pSyncNode->pVotesGranted)) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c2bf6dc837..a339cb9857 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -468,7 +468,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { } if (code == 0 && pEntry != NULL) { - if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { + if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) { ready = true; } @@ -736,7 +736,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex); if (code == 0) { pMsg->info.conn.applyIndex = retIndex; - pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; + pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm; sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); return 1; @@ -983,8 +983,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); - if (pSyncNode->pRaftStore == NULL) { + if (raftStoreReadFile(pSyncNode) != 0) { sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); goto _error; } @@ -1184,7 +1183,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { int32_t syncNodeStart(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { - raftStoreNextTerm(pSyncNode->pRaftStore); + raftStoreNextTerm(pSyncNode); syncNodeBecomeLeader(pSyncNode, "one replica start"); // Raft 3.6.2 Committing entries from previous terms @@ -1202,7 +1201,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { void syncNodeStartOld(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { - raftStoreNextTerm(pSyncNode->pRaftStore); + raftStoreNextTerm(pSyncNode); syncNodeBecomeLeader(pSyncNode, "one replica start"); // Raft 3.6.2 Committing entries from previous terms @@ -1288,10 +1287,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode == NULL) return; sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); - int32_t ret = raftStoreClose(pSyncNode->pRaftStore); - ASSERT(ret == 0); - pSyncNode->pRaftStore = NULL; - syncNodeLogReplMgrDestroy(pSyncNode); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); pSyncNode->pSyncRespMgr = NULL; @@ -1714,39 +1709,39 @@ _END: // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > pSyncNode->pRaftStore->currentTerm) { - raftStoreSetTerm(pSyncNode->pRaftStore, term); + if (term > pSyncNode->raftStore.currentTerm) { + raftStoreSetTerm(pSyncNode, term); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode->pRaftStore); + raftStoreClearVote(pSyncNode); } } void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { - if (term > pSyncNode->pRaftStore->currentTerm) { - raftStoreSetTerm(pSyncNode->pRaftStore, term); + if (term > pSyncNode->raftStore.currentTerm) { + raftStoreSetTerm(pSyncNode, term); } } void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { - if (pSyncNode->pRaftStore->currentTerm > newTerm) { + if (pSyncNode->raftStore.currentTerm > newTerm) { sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, - pSyncNode->pRaftStore->currentTerm); + pSyncNode->raftStore.currentTerm); return; } do { sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, - pSyncNode->pRaftStore->currentTerm); + pSyncNode->raftStore.currentTerm); } while (0); - if (pSyncNode->pRaftStore->currentTerm < newTerm) { - raftStoreSetTerm(pSyncNode->pRaftStore, newTerm); + if (pSyncNode->raftStore.currentTerm < newTerm) { + raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode->pRaftStore); + raftStoreClearVote(pSyncNode); } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { @@ -1904,7 +1899,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); ASSERT(lastIndex >= 0); sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); } void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) { @@ -1937,7 +1932,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "follower to candidate"); } @@ -1947,7 +1942,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "leader to follower"); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "leader to follower"); } @@ -1957,7 +1952,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "candidate to follower"); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "candidate to follower"); } @@ -1965,15 +1960,15 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { // just called by syncNodeVoteForSelf // need assert void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { - ASSERT(term == pSyncNode->pRaftStore->currentTerm); - ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore)); + ASSERT(term == pSyncNode->raftStore.currentTerm); + ASSERT(!raftStoreHasVoted(pSyncNode)); - raftStoreVote(pSyncNode->pRaftStore, pRaftId); + raftStoreVote(pSyncNode, pRaftId); } // simulate get vote from outside void syncNodeVoteForSelf(SSyncNode* pSyncNode) { - syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId); + syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId); SRpcMsg rpcMsg = {0}; int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId); @@ -1982,7 +1977,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { SyncRequestVoteReply* pMsg = rpcMsg.pCont; pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSyncNode->raftStore.currentTerm; pMsg->voteGranted = true; voteGrantedVote(pSyncNode->pVotesGranted, pMsg); @@ -2272,13 +2267,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { return; } - if (pSyncNode->pRaftStore == NULL) { - syncNodeRelease(pSyncNode); - syncHbTimerDataRelease(pData); - sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId); - return; - } - // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId); if (pSyncNode->replicaNum > 1) { @@ -2302,7 +2290,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pData->destId; - pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; + pSyncMsg->term = pSyncNode->raftStore.currentTerm; pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; @@ -2348,7 +2336,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { } SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore); - SyncTerm term = pNode->pRaftStore->currentTerm; + SyncTerm term = pNode->raftStore.currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId); if (pEntry == NULL) return -1; @@ -2394,8 +2382,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); terrno = TSDB_CODE_SYN_BUFFER_FULL; - (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry, - TSDB_CODE_SYN_BUFFER_FULL); + (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL); syncEntryDestroy(pEntry); return -1; } @@ -2468,7 +2455,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) { static int32_t syncNodeAppendNoop(SSyncNode* ths) { SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); - SyncTerm term = ths->pRaftStore->currentTerm; + SyncTerm term = ths->raftStore.currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); if (pEntry == NULL) { @@ -2484,7 +2471,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { int32_t ret = 0; SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); - SyncTerm term = ths->pRaftStore->currentTerm; + SyncTerm term = ths->raftStore.currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); ASSERT(pEntry != NULL); @@ -2526,12 +2513,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; pMsgReply->destId = pMsg->srcId; pMsgReply->srcId = ths->myRaftId; - pMsgReply->term = ths->pRaftStore->currentTerm; + pMsgReply->term = ths->raftStore.currentTerm; pMsgReply->privateTerm = 8864; // magic number pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { + if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncNodeResetElectTimer(ths); @@ -2560,7 +2547,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } } - if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { + if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { // syncNodeStepDown(ths, pMsg->term); SRpcMsg rpcMsgLocalCmd = {0}; (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); @@ -2687,7 +2674,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn int32_t code = 0; SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); - SyncTerm term = ths->pRaftStore->currentTerm; + SyncTerm term = ths->raftStore.currentTerm; SSyncRaftEntry* pEntry = NULL; if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index); @@ -2721,7 +2708,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe int32_t code = 0; SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); - SyncTerm term = ths->pRaftStore->currentTerm; + SyncTerm term = ths->raftStore.currentTerm; SSyncRaftEntry* pEntry; if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { @@ -2755,7 +2742,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe .state = ths->state, .seqNum = pEntry->seqNum, .term = pEntry->term, - .currentTerm = ths->pRaftStore->currentTerm, + .currentTerm = ths->raftStore.currentTerm, .flag = 0, }; ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta); @@ -2833,7 +2820,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p return 0; } - if (pEntry->term < ths->pRaftStore->currentTerm) { + if (pEntry->term < ths->raftStore.currentTerm) { sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term); return 0; } @@ -2871,7 +2858,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p if (ths->pFsm->FpLeaderTransferCb != NULL) { SFsmCbMeta cbMeta = { .code = 0, - .currentTerm = ths->pRaftStore->currentTerm, + .currentTerm = ths->raftStore.currentTerm, .flag = 0, .index = pEntry->index, .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), @@ -2987,7 +2974,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde .state = ths->state, .seqNum = pEntry->seqNum, .term = pEntry->term, - .currentTerm = ths->pRaftStore->currentTerm, + .currentTerm = ths->raftStore.currentTerm, .flag = flag, }; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 467b4e2219..29f327c35c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEnt pMsg->prevLogTerm = prevLogTerm; pMsg->vgId = pNode->vgId; pMsg->srcId = pNode->myRaftId; - pMsg->term = pNode->pRaftStore->currentTerm; + pMsg->term = pNode->raftStore.currentTerm; pMsg->commitIndex = pNode->commitIndex; pMsg->privateTerm = 0; return 0; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index f878044bca..b1f955b8df 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -285,9 +285,9 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); - int32_t ret = -1; - SyncIndex index = pEntry->index; - SyncIndex prevIndex = pEntry->index - 1; + int32_t ret = -1; + SyncIndex index = pEntry->index; + SyncIndex prevIndex = pEntry->index - 1; SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf); SSyncRaftEntry* pExist = NULL; bool inBuf = true; @@ -509,7 +509,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm SSyncLogStore* pLogStore = pNode->pLogStore; SSyncFSM* pFsm = pNode->pFsm; ESyncState role = pNode->state; - SyncTerm term = pNode->pRaftStore->currentTerm; + SyncTerm term = pNode->raftStore.currentTerm; SyncGroupId vgId = pNode->vgId; int32_t ret = -1; int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); @@ -571,7 +571,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && - pNode->pRaftStore->currentTerm <= pEntry->term) { + pNode->raftStore.currentTerm <= pEntry->term) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->restoreFinish = true; sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, @@ -614,9 +614,9 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { return -1; } - int32_t ret = -1; - bool retried = false; - int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); + int32_t ret = -1; + bool retried = false; + int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); int64_t nowMs = taosGetMonoTimestampMs(); int count = 0; int64_t firstIndex = -1; @@ -807,9 +807,9 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode } (void)syncLogReplMgrReset(pMgr); - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - bool barrier = false; - SyncTerm term = -1; + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + SyncTerm term = -1; if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); @@ -836,11 +836,11 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff)); - int32_t count = 0; - int64_t nowMs = taosGetMonoTimestampMs(); - int64_t limit = pMgr->size >> 1; + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff)); + int32_t count = 0; + int64_t nowMs = taosGetMonoTimestampMs(); + int64_t limit = pMgr->size >> 1; SyncTerm term = -1; SyncIndex firstIndex = -1; @@ -891,13 +891,13 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { ASSERT(pMgr->restored == true); if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { - if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { - int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs; - int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs; - int64_t timeDiffMs = lastSentMs - firstSentMs; - if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) { - pMgr->retryBackoff -= 1; - } + if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { + int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs; + int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs; + int64_t timeDiffMs = lastSentMs - firstSentMs; + if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) { + pMgr->retryBackoff -= 1; + } } pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index b19cda2a44..197d1463fd 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -16,156 +16,161 @@ #define _DEFAULT_SOURCE #include "syncRaftStore.h" #include "syncUtil.h" +#include "tjson.h" -// private function -static int32_t raftStoreInit(SRaftStore *pRaftStore); -static bool raftStoreFileExist(char *path); +static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) { + int32_t code = 0; -// public function -SRaftStore *raftStoreOpen(const char *path) { - int32_t ret; + tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code); + if (code < 0) return -1; + tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code); + if (code < 0) return -1; - SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore)); - if (pRaftStore == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); - if (!raftStoreFileExist(pRaftStore->path)) { - ret = raftStoreInit(pRaftStore); - ASSERT(ret == 0); - } - - char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; - pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); - ASSERT(pRaftStore->pFile != NULL); - - int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE); - ASSERT(len > 0); - - ret = raftStoreDeserialize(pRaftStore, storeBuf, len); - ASSERT(ret == 0); - - return pRaftStore; -} - -static int32_t raftStoreInit(SRaftStore *pRaftStore) { - ASSERT(pRaftStore != NULL); - - pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE); - ASSERT(pRaftStore->pFile != NULL); - - pRaftStore->currentTerm = 0; - pRaftStore->voteFor.addr = 0; - pRaftStore->voteFor.vgId = 0; - - int32_t ret = raftStorePersist(pRaftStore); - ASSERT(ret == 0); - - taosCloseFile(&pRaftStore->pFile); - return 0; -} - -int32_t raftStoreClose(SRaftStore *pRaftStore) { - if (pRaftStore == NULL) return 0; - - taosCloseFile(&pRaftStore->pFile); - taosMemoryFree(pRaftStore); - pRaftStore = NULL; return 0; } -int32_t raftStorePersist(SRaftStore *pRaftStore) { - ASSERT(pRaftStore != NULL); - - int32_t ret; - char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; - ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - ASSERT(ret == 0); +int32_t raftStoreReadFile(SSyncNode *pNode) { + int32_t code = -1; + TdFilePtr pFile = NULL; + char *pData = NULL; + SJson *pJson = NULL; + const char *file = pNode->raftStorePath; + SRaftStore *pStore = &pNode->raftStore; + + if (taosStatFile(file, NULL, NULL) < 0) { + sInfo("vgId:%d, raft store file:%s not exist, use default value", pNode->vgId, file); + pStore->currentTerm = 0; + pStore->voteFor.addr = 0; + pStore->voteFor.vgId = 0; + return raftStoreWriteFile(pNode); + } - taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET); + pFile = taosOpenFile(file, TD_FILE_READ); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr()); + goto _OVER; + } - ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf)); - ASSERT(ret == RAFT_STORE_BLOCK_SIZE); + int64_t size = 0; + if (taosFStatFile(pFile, &size, NULL) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr()); + goto _OVER; + } - taosFsyncFile(pRaftStore->pFile); - return 0; -} + pData = taosMemoryMalloc(size + 1); + if (pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } -static bool raftStoreFileExist(char *path) { - bool b = taosStatFile(path, NULL, NULL) >= 0; - return b; -} + if (taosReadFile(pFile, pData, size) != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr()); + goto _OVER; + } -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { - ASSERT(pRaftStore != NULL); + pData[size] = '\0'; - cJSON *pRoot = cJSON_CreateObject(); + pJson = tjsonParse(pData); + if (pJson == NULL) { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } - char u64Buf[128] = {0}; - snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->currentTerm); - cJSON_AddStringToObject(pRoot, "current_term", u64Buf); + if (raftStoreDecode(pJson, pStore) < 0) { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } - snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->voteFor.addr); - cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf); + code = 0; + sInfo("vgId:%d, succceed to read raft store file %s", pNode->vgId, file); - cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); +_OVER: + if (pData != NULL) taosMemoryFree(pData); + if (pJson != NULL) cJSON_Delete(pJson); + if (pFile != NULL) taosCloseFile(&pFile); - char *serialized = cJSON_Print(pRoot); - int len2 = strlen(serialized); - ASSERT(len2 < len); - memset(buf, 0, len); - snprintf(buf, len, "%s", serialized); - taosMemoryFree(serialized); + if (code != 0) { + sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr()); + } + return code; +} - cJSON_Delete(pRoot); +static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) { + if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1; + if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1; return 0; } -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { - ASSERT(pRaftStore != NULL); - - ASSERT(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); - cJSON *pRoot = cJSON_Parse(buf); - - cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); - ASSERT(cJSON_IsString(pCurrentTerm)); - sscanf(pCurrentTerm->valuestring, "%" PRIu64 "", &(pRaftStore->currentTerm)); - - cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); - ASSERT(cJSON_IsString(pVoteForAddr)); - sscanf(pVoteForAddr->valuestring, "%" PRIu64 "", &(pRaftStore->voteFor.addr)); - - cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); - pRaftStore->voteFor.vgId = pVoteForVgid->valueint; - - cJSON_Delete(pRoot); - return 0; +int32_t raftStoreWriteFile(SSyncNode *pNode) { + int32_t code = -1; + char *buffer = NULL; + SJson *pJson = NULL; + TdFilePtr pFile = NULL; + const char *realfile = pNode->raftStorePath; + SRaftStore *pStore = &pNode->raftStore; + char file[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s.bak", realfile); + + terrno = TSDB_CODE_OUT_OF_MEMORY; + pJson = tjsonCreateObject(); + if (pJson == NULL) goto _OVER; + if (raftStoreEncode(pJson, pStore) != 0) goto _OVER; + buffer = tjsonToString(pJson); + if (buffer == NULL) goto _OVER; + terrno = 0; + + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) goto _OVER; + + int32_t len = strlen(buffer); + if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; + if (taosFsyncFile(pFile) < 0) goto _OVER; + + taosCloseFile(&pFile); + if (taosRenameFile(file, realfile) != 0) goto _OVER; + + code = 0; + sInfo("vgId:%d, succeed to write raft store file:%s, len:%d", pNode->vgId, realfile, len); + +_OVER: + if (pJson != NULL) tjsonDelete(pJson); + if (buffer != NULL) taosMemoryFree(buffer); + if (pFile != NULL) taosCloseFile(&pFile); + + if (code != 0) { + if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr()); + } + return code; } -bool raftStoreHasVoted(SRaftStore *pRaftStore) { - bool b = syncUtilEmptyId(&(pRaftStore->voteFor)); +bool raftStoreHasVoted(SSyncNode *pNode) { + bool b = syncUtilEmptyId(&pNode->raftStore.voteFor); return (!b); } -void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { - ASSERT(!syncUtilEmptyId(pRaftId)); - pRaftStore->voteFor = *pRaftId; - raftStorePersist(pRaftStore); +void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) { + pNode->raftStore.voteFor = *pRaftId; + (void)raftStoreWriteFile(pNode); } -void raftStoreClearVote(SRaftStore *pRaftStore) { - pRaftStore->voteFor = EMPTY_RAFT_ID; - raftStorePersist(pRaftStore); +void raftStoreClearVote(SSyncNode *pNode) { + pNode->raftStore.voteFor = EMPTY_RAFT_ID; + (void)raftStoreWriteFile(pNode); } -void raftStoreNextTerm(SRaftStore *pRaftStore) { - ++(pRaftStore->currentTerm); - raftStorePersist(pRaftStore); +void raftStoreNextTerm(SSyncNode *pNode) { + pNode->raftStore.currentTerm++; + (void)raftStoreWriteFile(pNode); } -void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) { - pRaftStore->currentTerm = term; - raftStorePersist(pRaftStore); +void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) { + pNode->raftStore.currentTerm = term; + (void)raftStoreWriteFile(pNode); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index e3058768f8..1aa476e84e 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -122,7 +122,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh ASSERT(pMsg != NULL); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSyncNode->raftStore.currentTerm; pMsg->prevLogIndex = preLogIndex; pMsg->prevLogTerm = preLogTerm; pMsg->commitIndex = pSyncNode->commitIndex; @@ -245,7 +245,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pSyncNode->peersId[i]; - pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; + pSyncMsg->term = pSyncNode->raftStore.currentTerm; pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 773befe1e4..e9a18dfe86 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -44,12 +44,12 @@ // /\ UNCHANGED <> // -static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { - SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); - SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); +static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) { + SyncTerm myLastTerm = syncNodeGetLastTerm(ths); + SyncIndex myLastIndex = syncNodeGetLastIndex(ths); - if (pMsg->lastLogIndex < pSyncNode->commitIndex) { - sNTrace(pSyncNode, + if (pMsg->lastLogIndex < ths->commitIndex) { + sNTrace(ths, "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); @@ -58,7 +58,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM } if (myLastTerm == SYNC_TERM_INVALID) { - sNTrace(pSyncNode, + sNTrace(ths, "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); @@ -66,7 +66,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM } if (pMsg->lastLogTerm > myLastTerm) { - sNTrace(pSyncNode, + sNTrace(ths, "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); @@ -74,14 +74,14 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM } if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) { - sNTrace(pSyncNode, + sNTrace(ths, "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); return true; } - sNTrace(pSyncNode, + sNTrace(ths, "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); @@ -93,7 +93,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncRequestVote* pMsg = pRpcMsg->pCont; // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) { syncLogRecvRequestVote(ths, pMsg, -1, "not in my config"); return -1; } @@ -101,21 +101,21 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); // maybe update term - if (pMsg->term > ths->pRaftStore->currentTerm) { + if (pMsg->term > ths->raftStore.currentTerm) { syncNodeStepDown(ths, pMsg->term); // syncNodeUpdateTerm(ths, pMsg->term); } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); + ASSERT(pMsg->term <= ths->raftStore.currentTerm); - bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && - ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); + bool grant = (pMsg->term == ths->raftStore.currentTerm) && logOK && + ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId))); if (grant) { // maybe has already voted for pMsg->srcId // vote again, no harm - raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); + raftStoreVote(ths, &(pMsg->srcId)); // candidate ? - syncNodeStepDown(ths, ths->pRaftStore->currentTerm); + syncNodeStepDown(ths, ths->raftStore.currentTerm); // forbid elect for this round syncNodeResetElectTimer(ths); @@ -129,7 +129,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncRequestVoteReply* pReply = rpcMsg.pCont; pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; + pReply->term = ths->raftStore.currentTerm; pReply->voteGranted = grant; // trace log diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 563f475070..a0d6cbf597 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -49,25 +49,25 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { + if (pMsg->term < ths->raftStore.currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); return -1; } - // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); + // ASSERT(!(pMsg->term > ths->raftStore.currentTerm)); // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { + // if (pMsg->term > ths->raftStore.currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); // } - if (pMsg->term > ths->pRaftStore->currentTerm) { + if (pMsg->term > ths->raftStore.currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return -1; } syncLogRecvRequestVoteReply(ths, pMsg, ""); - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->raftStore.currentTerm); // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index b55aae4c76..9373eccaef 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { .state = pNode->state, .seqNum = *pSeqNum, .term = SYNC_TERM_INVALID, - .currentTerm = pNode->pRaftStore->currentTerm, + .currentTerm = pNode->raftStore.currentTerm, .flag = 0, }; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index defb7402f4..880c76e4dd 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; - pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->term = pSyncNode->raftStore.currentTerm; pSender->startTime = 0; pSender->endTime = 0; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); @@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); pSender->sendingMS = 0; - pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; + pSender->term = pSender->pSyncNode->raftStore.currentTerm; pSender->startTime = taosGetTimestampMs(); pSender->lastSendTime = pSender->startTime; pSender->finish = false; @@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSender->pSyncNode->raftStore.currentTerm; pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSender->pSyncNode->raftStore.currentTerm; pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSender->pSyncNode->raftStore.currentTerm; pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->pWriter = NULL; pReceiver->pSyncNode = pSyncNode; pReceiver->fromId = fromId; - pReceiver->term = pSyncNode->pRaftStore->currentTerm; + pReceiver->term = pSyncNode->raftStore.currentTerm; pReceiver->snapshot.data = NULL; pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastApplyTerm = 0; @@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p pReceiver->start = true; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; - pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; @@ -437,9 +437,9 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap } // maybe update term - if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) { - pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm; - raftStorePersist(pReceiver->pSyncNode->pRaftStore); + if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) { + pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm; + (void)raftStoreWriteFile(pReceiver->pSyncNode); } // stop writer, apply data @@ -592,7 +592,7 @@ _SEND_REPLY: SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -648,7 +648,7 @@ _SEND_REPLY: SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -698,7 +698,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -745,7 +745,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -794,13 +794,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return -1; } - if (pMsg->term < pSyncNode->pRaftStore->currentTerm) { + if (pMsg->term < pSyncNode->raftStore.currentTerm) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } - if (pMsg->term > pSyncNode->pRaftStore->currentTerm) { + if (pMsg->term > pSyncNode->raftStore.currentTerm) { syncNodeStepDown(pSyncNode, pMsg->term); } syncNodeResetElectTimer(pSyncNode); @@ -808,7 +808,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // state, term, seq/ack int32_t code = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { + if (pMsg->term == pSyncNode->raftStore.currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); code = syncNodeOnSnapshotPre(pSyncNode, pMsg); @@ -892,7 +892,7 @@ static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSende SyncSnapshotSend *pSendMsg = rpcMsg.pCont; pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm; pSendMsg->beginIndex = pSender->snapshotParam.start; pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -951,10 +951,10 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { goto _ERROR; } - if (pMsg->term != pSyncNode->pRaftStore->currentTerm) { + if (pMsg->term != pSyncNode->raftStore.currentTerm) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, - pSyncNode->pRaftStore->currentTerm); + pSyncNode->raftStore.currentTerm); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; goto _ERROR; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index e4a65837f7..b246d9a79d 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -158,8 +158,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { } void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { - if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; - int64_t currentTerm = pNode->pRaftStore->currentTerm; + if (pNode == NULL || pNode->pLogStore == NULL) return; + int64_t currentTerm = pNode->raftStore.currentTerm; // save error code, otherwise it will be overwritten int32_t errCode = terrno; @@ -228,7 +228,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, const char* format, ...) { SSyncNode* pNode = pSender->pSyncNode; - if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pLogStore == NULL) return; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -264,7 +264,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, - DID(&pNode->replicasId[pSender->replicaIndex]), pNode->pRaftStore->currentTerm, pNode->commitIndex, + DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), @@ -274,7 +274,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, const char* format, ...) { SSyncNode* pNode = pReceiver->pSyncNode; - if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pLogStore == NULL) return; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -311,7 +311,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, - pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, + pReceiver->snapshot.lastConfigIndex, pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), diff --git a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c index f1db2f0204..a3e76eabcc 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c @@ -80,7 +80,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // tla+ server vars cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state)); - cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore)); + cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(&pSyncNode.raftStore)); // tla+ candidate vars cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted)); @@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ", sby:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d", - pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, + pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum, pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c index f1237e5282..d8740de16a 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c @@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); pMsgReply->srcId = ths->myRaftId; pMsgReply->destId = pMsg->srcId; - pMsgReply->term = ths->pRaftStore->currentTerm; + pMsgReply->term = ths->raftStore.currentTerm; SSyncLogStoreData *pData = ths->pLogStore->data; SWal *pWal = pData->pWal; -- GitLab