diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 7e08e195c1d255783da14967a26a4386b31e0b6b..6f2c1a1ad0e76d233d516e36ee419cae43fcc409 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -71,6 +71,7 @@ typedef struct SRaftId { typedef struct SRaftStore { SyncTerm currentTerm; SRaftId voteFor; + TdThreadMutex mutex; } SRaftStore; typedef struct SSyncHbTimerData { @@ -282,7 +283,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // raft vote -------------- void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); -void syncNodeVoteForSelf(SSyncNode* pSyncNode); +void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm term); // log replication SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 21a8fc64a811a832bc3893223b4c1940a6638f54..38a8ed234b8dcfa04454230b6224ff2b838b3fb6 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -26,14 +26,15 @@ extern "C" { #define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2) #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) -int32_t raftStoreReadFile(SSyncNode *pNode); -int32_t raftStoreWriteFile(SSyncNode *pNode); +int32_t raftStoreOpen(SSyncNode *pNode); +void raftStoreClose(SSyncNode *pNode); bool raftStoreHasVoted(SSyncNode *pNode); void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId); void raftStoreClearVote(SSyncNode *pNode); void raftStoreNextTerm(SSyncNode *pNode); void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term); +SyncTerm raftStoreGetTerm(SSyncNode *pNode); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 835e5d248e345cbbb3206e35d67ddd20717009db..b04bcb86c69de5088e53f2852d24430a722979a0 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -120,17 +120,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // prepare response msg pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; - pReply->term = ths->raftStore.currentTerm; + pReply->term = raftStoreGetTerm(ths); pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->startTime = ths->startTime; - if (pMsg->term < ths->raftStore.currentTerm) { + if (pMsg->term < raftStoreGetTerm(ths)) { goto _SEND_RESPONSE; } - if (pMsg->term > ths->raftStore.currentTerm) { + if (pMsg->term > raftStoreGetTerm(ths)) { pReply->term = pMsg->term; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 44a29da3ea0e54d4e9932183a67d298a9c6239ed..f81699b9f6de4f0d7881cee7d42c6dfefd7acd71 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // drop stale response - if (pMsg->term < ths->raftStore.currentTerm) { + if (pMsg->term < raftStoreGetTerm(ths)) { syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); return 0; } if (ths->state == TAOS_SYNC_STATE_LEADER) { - if (pMsg->term > ths->raftStore.currentTerm) { + if (pMsg->term > raftStoreGetTerm(ths)) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return -1; } - ASSERT(pMsg->term == ths->raftStore.currentTerm); + ASSERT(pMsg->term == raftStoreGetTerm(ths)); sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 6d256a735de387ac10523580b7d25508d8c10a80..2501b4df8b813f438a4724097fff8eb15228b5e6 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -111,7 +111,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { SyncIndex commitIndex = indexLikely; syncNodeUpdateCommitIndex(ths, commitIndex); sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, - ths->raftStore.currentTerm, commitIndex); + raftStoreGetTerm(ths), commitIndex); } return ths->commitIndex; } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 682ace83ecfa99e4781f70915048cf62a5e2d76f..e53b8ade1ca71281f4ec2536195456def7882781 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -51,7 +51,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { SyncRequestVote* pMsg = rpcMsg.pCont; pMsg->srcId = pNode->myRaftId; pMsg->destId = pNode->peersId[i]; - pMsg->term = pNode->raftStore.currentTerm; + pMsg->term = raftStoreGetTerm(pNode); ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); if (ret < 0) { @@ -85,10 +85,12 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { // start election raftStoreNextTerm(pSyncNode); raftStoreClearVote(pSyncNode); - voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->raftStore.currentTerm); - votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->raftStore.currentTerm); - syncNodeVoteForSelf(pSyncNode); + SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); + voteGrantedReset(pSyncNode->pVotesGranted, currentTerm); + votesRespondReset(pSyncNode->pVotesRespond, currentTerm); + syncNodeVoteForSelf(pSyncNode, currentTerm); + if (voteGrantedMajority(pSyncNode->pVotesGranted)) { // only myself, to leader ASSERT(!pSyncNode->pVotesGranted->toLeader); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 803df6b662f93f2ab0281e859705397e81c1d41a..7f7a3f113b910eaa8484721dd232dd7c071a6aec 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -41,7 +41,6 @@ static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); -static int32_t syncNodeEqNoop(SSyncNode* ths); static int32_t syncNodeAppendNoop(SSyncNode* ths); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg); @@ -468,7 +467,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { } if (code == 0 && pEntry != NULL) { - if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) { + if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == raftStoreGetTerm(pSyncNode)) { ready = true; } @@ -664,7 +663,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex); if (code == 0) { pMsg->info.conn.applyIndex = retIndex; - pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm; + pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode); sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); return 1; @@ -911,7 +910,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - if (raftStoreReadFile(pSyncNode) != 0) { + if (raftStoreOpen(pSyncNode) != 0) { sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); goto _error; } @@ -1212,7 +1211,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode == NULL) return; sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); + syncNodeStopPingTimer(pSyncNode); + syncNodeStopElectTimer(pSyncNode); + syncNodeStopHeartbeatTimer(pSyncNode); + syncNodeLogReplMgrDestroy(pSyncNode); + syncRespMgrDestroy(pSyncNode->pSyncRespMgr); pSyncNode->pSyncRespMgr = NULL; voteGrantedDestroy(pSyncNode->pVotesGranted); @@ -1228,10 +1232,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { syncLogBufferDestroy(pSyncNode->pLogBuf); pSyncNode->pLogBuf = NULL; - syncNodeStopPingTimer(pSyncNode); - syncNodeStopElectTimer(pSyncNode); - syncNodeStopHeartbeatTimer(pSyncNode); - for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { if (pSyncNode->senders[i] != NULL) { sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]); @@ -1259,6 +1259,8 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + raftStoreClose(pSyncNode); + taosMemoryFree(pSyncNode); } @@ -1633,7 +1635,7 @@ _END: // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > pSyncNode->raftStore.currentTerm) { + if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); @@ -1643,24 +1645,23 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { - if (term > pSyncNode->raftStore.currentTerm) { + if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); } } void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { - if (pSyncNode->raftStore.currentTerm > newTerm) { - sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, - pSyncNode->raftStore.currentTerm); + SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); + if (currentTerm > newTerm) { + sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); return; } do { - sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, - pSyncNode->raftStore.currentTerm); + sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); } while (0); - if (pSyncNode->raftStore.currentTerm < newTerm) { + if (currentTerm < newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); @@ -1820,8 +1821,8 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); ASSERT(lastIndex >= 0); - sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", - pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); + sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId, + raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); } bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } @@ -1840,7 +1841,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "follower to candidate"); } @@ -1850,7 +1851,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "leader to follower"); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "leader to follower"); } @@ -1860,7 +1861,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "candidate to follower"); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); + pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "candidate to follower"); } @@ -1868,7 +1869,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { // just called by syncNodeVoteForSelf // need assert void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { - ASSERT(term == pSyncNode->raftStore.currentTerm); + ASSERT(term == raftStoreGetTerm(pSyncNode)); bool voted = raftStoreHasVoted(pSyncNode); ASSERT(!voted); @@ -1876,8 +1877,8 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) } // simulate get vote from outside -void syncNodeVoteForSelf(SSyncNode* pSyncNode) { - syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId); +void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) { + syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId); SRpcMsg rpcMsg = {0}; int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId); @@ -1886,7 +1887,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { SyncRequestVoteReply* pMsg = rpcMsg.pCont; pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId; - pMsg->term = pSyncNode->raftStore.currentTerm; + pMsg->term = currentTerm; pMsg->voteGranted = true; voteGrantedVote(pSyncNode->pVotesGranted, pMsg); @@ -2199,7 +2200,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pData->destId; - pSyncMsg->term = pSyncNode->raftStore.currentTerm; + pSyncMsg->term = raftStoreGetTerm(pSyncNode); pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; @@ -2238,30 +2239,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { syncNodeRelease(pSyncNode); } -static int32_t syncNodeEqNoop(SSyncNode* pNode) { - if (pNode->state == TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_NOT_LEADER; - return -1; - } - - SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore); - SyncTerm term = pNode->raftStore.currentTerm; - SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId); - if (pEntry == NULL) return -1; - - SRpcMsg rpcMsg = {0}; - int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId); - syncEntryDestroy(pEntry); - - sNTrace(pNode, "propose msg, type:noop"); - code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg); - if (code != 0) { - sError("failed to propose noop msg while enqueue since %s", terrstr()); - } - - return code; -} - static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); } int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) { @@ -2291,7 +2268,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); terrno = TSDB_CODE_SYN_BUFFER_FULL; - (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL); + (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL); syncEntryDestroy(pEntry); return -1; } @@ -2364,7 +2341,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) { static int32_t syncNodeAppendNoop(SSyncNode* ths) { SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); - SyncTerm term = ths->raftStore.currentTerm; + SyncTerm term = raftStoreGetTerm(ths); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); if (pEntry == NULL) { @@ -2380,7 +2357,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { int32_t ret = 0; SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); - SyncTerm term = ths->raftStore.currentTerm; + SyncTerm term = raftStoreGetTerm(ths); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); ASSERT(pEntry != NULL); @@ -2418,16 +2395,17 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); + SyncTerm currentTerm = raftStoreGetTerm(ths); SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; pMsgReply->destId = pMsg->srcId; pMsgReply->srcId = ths->myRaftId; - pMsgReply->term = ths->raftStore.currentTerm; + pMsgReply->term = currentTerm; pMsgReply->privateTerm = 8864; // magic number pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; - if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { + if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncNodeResetElectTimer(ths); @@ -2456,7 +2434,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } } - if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { + if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { // syncNodeStepDown(ths, pMsg->term); SRpcMsg rpcMsgLocalCmd = {0}; (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); @@ -2565,7 +2543,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn int32_t code = 0; SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); - SyncTerm term = ths->raftStore.currentTerm; + SyncTerm term = raftStoreGetTerm(ths); SSyncRaftEntry* pEntry = NULL; if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index); @@ -2609,73 +2587,6 @@ const char* syncStr(ESyncState state) { } } -#if 0 -int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { - if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { - sNTrace(ths, "I am not follower, can not do leader transfer"); - return 0; - } - - if (!ths->restoreFinish) { - sNTrace(ths, "restore not finish, can not do leader transfer"); - return 0; - } - - if (pEntry->term < ths->raftStore.currentTerm) { - sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term); - return 0; - } - - if (pEntry->index < syncNodeGetLastIndex(ths)) { - sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index); - return 0; - } - - /* - if (ths->vgId > 1) { - sNTrace(ths, "I am vnode, can not do leader transfer"); - return 0; - } - */ - - SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont; - sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index); - - bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); - bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && - pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort; - - bool same = sameId || sameNodeInfo; - if (same) { - // reset elect timer now! - int32_t electMS = 1; - int32_t ret = syncNodeRestartElectTimer(ths, electMS); - ASSERT(ret == 0); - - sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, - pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); - } - - if (ths->pFsm->FpLeaderTransferCb != NULL) { - SFsmCbMeta cbMeta = { - .code = 0, - .currentTerm = ths->raftStore.currentTerm, - .flag = 0, - .index = pEntry->index, - .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), - .isWeak = pEntry->isWeak, - .seqNum = pEntry->seqNum, - .state = ths->state, - .term = pEntry->term, - }; - ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta); - } - - return 0; -} - -#endif - int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) { SRaftId raftId = { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 7d534c671e6c80453f8a3b47fffebcf8028e062e..2a44588eefc2287bce3cfc310907c081f32338fb 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE pMsg->prevLogTerm = prevLogTerm; pMsg->vgId = pNode->vgId; pMsg->srcId = pNode->myRaftId; - pMsg->term = pNode->raftStore.currentTerm; + pMsg->term = raftStoreGetTerm(pNode); pMsg->commitIndex = pNode->commitIndex; pMsg->privateTerm = 0; return 0; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index b3eb5684cfc1435384ba656322e5f656cdbfc2de..0a34c370daace02e0084d5b7cfb1adee1c883f6d 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -61,6 +61,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; ASSERTS(pMatch != NULL, "no matched log entry"); ASSERT(pMatch->index + 1 == index); + ASSERT(pMatch->term <= pEntry->term); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; pBuf->entries[index % pBuf->size] = tmp; @@ -514,7 +515,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm SSyncLogStore* pLogStore = pNode->pLogStore; SSyncFSM* pFsm = pNode->pFsm; ESyncState role = pNode->state; - SyncTerm term = pNode->raftStore.currentTerm; + SyncTerm currentTerm = raftStoreGetTerm(pNode); SyncGroupId vgId = pNode->vgId; int32_t ret = -1; int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); @@ -529,7 +530,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64, - pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term); + pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm); // execute in fsm for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { @@ -545,16 +546,16 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } - if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) { + if (syncLogFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64, - vgId, pEntry->index, pEntry->term, role, term); + vgId, pEntry->index, pEntry->term, role, currentTerm); goto _out; } pBuf->commitIndex = index; sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, - pEntry->index, pEntry->term, role, term); + pEntry->index, pEntry->term, role, currentTerm); if (!inBuf) { syncEntryDestroy(pEntry); @@ -576,7 +577,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && - pNode->raftStore.currentTerm <= pEntry->term) { + currentTerm <= pEntry->term) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->restoreFinish = true; sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 197d1463fd80791403425adaed75ab31d380e4bb..68e735cf0d7e6689d4724d2439804a1379330f61 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -18,6 +18,9 @@ #include "syncUtil.h" #include "tjson.h" +int32_t raftStoreReadFile(SSyncNode *pNode); +int32_t raftStoreWriteFile(SSyncNode *pNode); + static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) { int32_t code = 0; @@ -150,27 +153,53 @@ _OVER: return code; } +int32_t raftStoreOpen(SSyncNode *pNode) { + taosThreadMutexInit(&pNode->raftStore.mutex, NULL); + return raftStoreReadFile(pNode); +} + +void raftStoreClose(SSyncNode *pNode) { taosThreadMutexDestroy(&pNode->raftStore.mutex); } + bool raftStoreHasVoted(SSyncNode *pNode) { + taosThreadMutexLock(&pNode->raftStore.mutex); bool b = syncUtilEmptyId(&pNode->raftStore.voteFor); + taosThreadMutexUnlock(&pNode->raftStore.mutex); return (!b); } void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) { + taosThreadMutexLock(&pNode->raftStore.mutex); pNode->raftStore.voteFor = *pRaftId; (void)raftStoreWriteFile(pNode); + taosThreadMutexUnlock(&pNode->raftStore.mutex); } void raftStoreClearVote(SSyncNode *pNode) { + taosThreadMutexLock(&pNode->raftStore.mutex); pNode->raftStore.voteFor = EMPTY_RAFT_ID; (void)raftStoreWriteFile(pNode); + taosThreadMutexUnlock(&pNode->raftStore.mutex); } void raftStoreNextTerm(SSyncNode *pNode) { + taosThreadMutexLock(&pNode->raftStore.mutex); pNode->raftStore.currentTerm++; (void)raftStoreWriteFile(pNode); + taosThreadMutexUnlock(&pNode->raftStore.mutex); } void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) { - pNode->raftStore.currentTerm = term; - (void)raftStoreWriteFile(pNode); + taosThreadMutexLock(&pNode->raftStore.mutex); + if (pNode->raftStore.currentTerm < term) { + pNode->raftStore.currentTerm = term; + (void)raftStoreWriteFile(pNode); + } + taosThreadMutexUnlock(&pNode->raftStore.mutex); +} + +SyncTerm raftStoreGetTerm(SSyncNode *pNode) { + taosThreadMutexLock(&pNode->raftStore.mutex); + SyncTerm term = pNode->raftStore.currentTerm; + taosThreadMutexUnlock(&pNode->raftStore.mutex); + return term; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 3df203221b88bbd0b9d804613e2ea50e881be149..8cdf821cffa1d76a9985b8d2515a439a7f93cdf7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -107,7 +107,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pSyncNode->peersId[i]; - pSyncMsg->term = pSyncNode->raftStore.currentTerm; + pSyncMsg->term = raftStoreGetTerm(pSyncNode); pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 069ea2ea88b85c91846b975513dcd681944c7ace..2fda2a19b8dad08a49d35e8d0492f7e3259fbffe 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -97,15 +97,14 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); - // maybe update term - if (pMsg->term > ths->raftStore.currentTerm) { + if (pMsg->term > raftStoreGetTerm(ths)) { syncNodeStepDown(ths, pMsg->term); - // syncNodeUpdateTerm(ths, pMsg->term); } - ASSERT(pMsg->term <= ths->raftStore.currentTerm); + SyncTerm currentTerm = raftStoreGetTerm(ths); + ASSERT(pMsg->term <= currentTerm); - bool grant = (pMsg->term == ths->raftStore.currentTerm) && logOK && + bool grant = (pMsg->term == currentTerm) && logOK && ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId))); if (grant) { // maybe has already voted for pMsg->srcId @@ -113,7 +112,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { raftStoreVote(ths, &(pMsg->srcId)); // candidate ? - syncNodeStepDown(ths, ths->raftStore.currentTerm); + syncNodeStepDown(ths, currentTerm); // forbid elect for this round syncNodeResetElectTimer(ths); @@ -127,8 +126,9 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncRequestVoteReply* pReply = rpcMsg.pCont; pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; - pReply->term = ths->raftStore.currentTerm; + pReply->term = currentTerm; pReply->voteGranted = grant; + ASSERT(!grant || pMsg->term == pReply->term); // trace log syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a0d6cbf597bb95c9d0e73b9cb9cde9f7c94d4a98..25c9f813a6fe6dc173569dfed194085a903fabf4 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -47,27 +47,21 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvRequestVoteReply(ths, pMsg, "not in my config"); return -1; } - + SyncTerm currentTerm = raftStoreGetTerm(ths); // drop stale response - if (pMsg->term < ths->raftStore.currentTerm) { + if (pMsg->term < currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); return -1; } - // ASSERT(!(pMsg->term > ths->raftStore.currentTerm)); - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->raftStore.currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->raftStore.currentTerm) { + if (pMsg->term > currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return -1; } syncLogRecvRequestVoteReply(ths, pMsg, ""); - ASSERT(pMsg->term == ths->raftStore.currentTerm); + ASSERT(pMsg->term == currentTerm); // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 9373eccaef6b27fd451fa77b360972e47cbea3d7..f9f14c2e00816817da17eb954bd6ee4278d8b51d 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { .state = pNode->state, .seqNum = *pSeqNum, .term = SYNC_TERM_INVALID, - .currentTerm = pNode->raftStore.currentTerm, + .currentTerm = SYNC_TERM_INVALID, .flag = 0, }; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 18f263cc95f7bb9f611cb0a81343b60e7e36d95b..a83a19928e0a38bdbdd56e9ffc9fdb1927a9ac77 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; - pSender->term = pSyncNode->raftStore.currentTerm; + pSender->term = raftStoreGetTerm(pSyncNode); pSender->startTime = 0; pSender->endTime = 0; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); @@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); pSender->sendingMS = 0; - pSender->term = pSender->pSyncNode->raftStore.currentTerm; + pSender->term = raftStoreGetTerm(pSender->pSyncNode); pSender->startTime = taosGetTimestampMs(); pSender->lastSendTime = pSender->startTime; pSender->finish = false; @@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->raftStore.currentTerm; + pMsg->term = raftStoreGetTerm(pSender->pSyncNode); pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->raftStore.currentTerm; + pMsg->term = raftStoreGetTerm(pSender->pSyncNode); pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->raftStore.currentTerm; + pMsg->term = raftStoreGetTerm(pSender->pSyncNode); pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->pWriter = NULL; pReceiver->pSyncNode = pSyncNode; pReceiver->fromId = fromId; - pReceiver->term = pSyncNode->raftStore.currentTerm; + pReceiver->term = raftStoreGetTerm(pSyncNode); pReceiver->snapshot.data = NULL; pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastApplyTerm = 0; @@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p pReceiver->start = true; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; - pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm; + pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode); pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; @@ -437,9 +437,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap } // maybe update term - if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) { - pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm; - (void)raftStoreWriteFile(pReceiver->pSyncNode); + if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) { + raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm); } // stop writer, apply data @@ -584,7 +583,7 @@ _SEND_REPLY: SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->raftStore.currentTerm; + pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -640,7 +639,7 @@ _SEND_REPLY: SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->raftStore.currentTerm; + pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -690,7 +689,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->raftStore.currentTerm; + pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -737,7 +736,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->raftStore.currentTerm; + pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; @@ -786,13 +785,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return -1; } - if (pMsg->term < pSyncNode->raftStore.currentTerm) { + if (pMsg->term < raftStoreGetTerm(pSyncNode)) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } - if (pMsg->term > pSyncNode->raftStore.currentTerm) { + if (pMsg->term > raftStoreGetTerm(pSyncNode)) { syncNodeStepDown(pSyncNode, pMsg->term); } syncNodeResetElectTimer(pSyncNode); @@ -800,7 +799,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // state, term, seq/ack int32_t code = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - if (pMsg->term == pSyncNode->raftStore.currentTerm) { + if (pMsg->term == raftStoreGetTerm(pSyncNode)) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); @@ -884,7 +883,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend SyncSnapshotSend *pSendMsg = rpcMsg.pCont; pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm; + pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode); pSendMsg->beginIndex = pSender->snapshotParam.start; pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -943,10 +942,11 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { goto _ERROR; } - if (pMsg->term != pSyncNode->raftStore.currentTerm) { + SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); + if (pMsg->term != currentTerm) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, - pSyncNode->raftStore.currentTerm); + currentTerm); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; goto _ERROR; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 97641b8f414dd6afe1aa8243a9bd482f68a6b57c..a519c76cda53a5f66faeaefa17f56053c5d083e6 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -154,7 +154,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { if (pNode == NULL || pNode->pLogStore == NULL) return; - int64_t currentTerm = pNode->raftStore.currentTerm; + int64_t currentTerm = raftStoreGetTerm(pNode); // save error code, otherwise it will be overwritten int32_t errCode = terrno; @@ -260,7 +260,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, - DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex, + DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, @@ -308,7 +308,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, - pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, + raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); diff --git a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c index 1dbf4fb4fb70333d6a0cf0cb0a61138e58877504..18a75934fd6e5a5d96f63fad7d66b85f9c739932 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c @@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ", sby:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d", - pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, + pSyncNode->vgId, syncStr(pSyncNode->state), raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum, pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c index d8740de16ae46214c8c5ed2ab2e1d622c003c300..2edcb0ad4dc4c8e593f842ee967a863b1b3019dd 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c @@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); pMsgReply->srcId = ths->myRaftId; pMsgReply->destId = pMsg->srcId; - pMsgReply->term = ths->raftStore.currentTerm; + pMsgReply->term = raftStoreGetTerm(ths); SSyncLogStoreData *pData = ths->pLogStore->data; SWal *pWal = pData->pWal;