提交 2aeda3a9 编写于 作者: S Shengliang Guan

enh: refact raft store file

上级 f2d6ed86
......@@ -32,11 +32,9 @@ typedef struct SyncRequestVoteReply SyncRequestVoteReply;
typedef struct SyncAppendEntries SyncAppendEntries;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
typedef struct SSyncEnv SSyncEnv;
typedef struct SRaftStore SRaftStore;
typedef struct SVotesGranted SVotesGranted;
typedef struct SVotesRespond SVotesRespond;
typedef struct SSyncIndexMgr SSyncIndexMgr;
typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
......@@ -70,6 +68,11 @@ typedef struct SRaftId {
SyncGroupId vgId;
} SRaftId;
typedef struct SRaftStore {
SyncTerm currentTerm;
SRaftId voteFor;
} SRaftStore;
typedef struct SSyncHbTimerData {
int64_t syncNodeRid;
SSyncTimer* pTimer;
......@@ -112,8 +115,8 @@ typedef struct SSyncNode {
// sync io
SSyncLogBuffer* pLogBuf;
SWal* pWal;
const SMsgCb* msgcb;
SWal* pWal;
const SMsgCb* msgcb;
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
......@@ -139,8 +142,8 @@ typedef struct SSyncNode {
int64_t rid;
// tla+ server vars
ESyncState state;
SRaftStore* pRaftStore;
ESyncState state;
SRaftStore raftStore;
// tla+ candidate vars
SVotesGranted* pVotesGranted;
......@@ -229,7 +232,7 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode);
void syncNodePostClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncNodeRestore(SSyncNode* pSyncNode);
void syncHbTimerDataFree(SSyncHbTimerData* pData);
......@@ -24,27 +24,16 @@ extern "C" {
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SRaftStore {
SyncTerm currentTerm;
SRaftId voteFor;
TdFilePtr pFile;
} SRaftStore;
SRaftStore *raftStoreOpen(const char *path);
int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
bool raftStoreHasVoted(SRaftStore *pRaftStore);
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
int32_t raftStoreReadFile(SSyncNode *pNode);
int32_t raftStoreWriteFile(SSyncNode *pNode);
bool raftStoreHasVoted(SSyncNode *pNode);
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId);
void raftStoreClearVote(SSyncNode *pNode);
void raftStoreNextTerm(SSyncNode *pNode);
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term);
#ifdef __cplusplus
......@@ -159,17 +159,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// prepare response msg
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->term = ths->raftStore.currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) {
if (pMsg->term < ths->raftStore.currentTerm) {
if (pMsg->term > ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->raftStore.currentTerm) {
pReply->term = pMsg->term;
......@@ -253,19 +253,19 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pReply = rpcRsp.pCont;
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->term = ths->raftStore.currentTerm;
pReply->success = false;
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) {
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, small term");
if (pMsg->term > ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->raftStore.currentTerm) {
pReply->term = pMsg->term;
......@@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
ASSERT(pMsg->term == ths->raftStore.currentTerm);
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
......@@ -100,19 +100,19 @@ int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply*
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
ASSERT(pMsg->term == ths->raftStore.currentTerm);
if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
......@@ -133,7 +133,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// cannot commit, even if quorum agree. need check term!
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
if (pEntry->term <= pSyncNode->raftStore.currentTerm) {
// update commit index
newCommitIndex = index;
......@@ -329,7 +329,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex);
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex);
ths->raftStore.currentTerm, commitIndex);
return ths->commitIndex;
......@@ -48,7 +48,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
SyncRequestVote* pMsg = rpcMsg.pCont;
pMsg->srcId = pNode->myRaftId;
pMsg->destId = pNode->peersId[i];
pMsg->term = pNode->pRaftStore->currentTerm;
pMsg->term = pNode->raftStore.currentTerm;
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
ASSERT(ret == 0);
......@@ -75,10 +75,10 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
// start election
voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->pRaftStore->currentTerm);
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm);
voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->raftStore.currentTerm);
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->raftStore.currentTerm);
if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
......@@ -468,7 +468,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) {
ready = true;
......@@ -736,7 +736,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
if (code == 0) {
pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm;
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
return 1;
......@@ -983,8 +983,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init TLA+ server vars
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
if (pSyncNode->pRaftStore == NULL) {
if (raftStoreReadFile(pSyncNode) != 0) {
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
goto _error;
......@@ -1184,7 +1183,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
int32_t syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms
......@@ -1202,7 +1201,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
void syncNodeStartOld(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms
......@@ -1288,10 +1287,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return;
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0);
pSyncNode->pRaftStore = NULL;
pSyncNode->pSyncRespMgr = NULL;
......@@ -1714,39 +1709,39 @@ _END:
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
if (term > pSyncNode->raftStore.currentTerm) {
raftStoreSetTerm(pSyncNode, term);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
if (term > pSyncNode->raftStore.currentTerm) {
raftStoreSetTerm(pSyncNode, term);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
if (pSyncNode->pRaftStore->currentTerm > newTerm) {
if (pSyncNode->raftStore.currentTerm > newTerm) {
sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
do {
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
} while (0);
if (pSyncNode->pRaftStore->currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
if (pSyncNode->raftStore.currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode, newTerm);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
} else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
......@@ -1904,7 +1899,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
ASSERT(lastIndex >= 0);
sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
......@@ -1937,7 +1932,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "follower to candidate");
......@@ -1947,7 +1942,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "leader to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "leader to follower");
......@@ -1957,7 +1952,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "candidate to follower");
......@@ -1965,15 +1960,15 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
// just called by syncNodeVoteForSelf
// need assert
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
ASSERT(term == pSyncNode->pRaftStore->currentTerm);
ASSERT(term == pSyncNode->raftStore.currentTerm);
raftStoreVote(pSyncNode->pRaftStore, pRaftId);
raftStoreVote(pSyncNode, pRaftId);
// simulate get vote from outside
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId);
SRpcMsg rpcMsg = {0};
int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
......@@ -1982,7 +1977,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
SyncRequestVoteReply* pMsg = rpcMsg.pCont;
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->myRaftId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->term = pSyncNode->raftStore.currentTerm;
pMsg->voteGranted = true;
voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
......@@ -2272,13 +2267,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
if (pSyncNode->pRaftStore == NULL) {
sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
// sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
if (pSyncNode->replicaNum > 1) {
......@@ -2302,7 +2290,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->term = pSyncNode->raftStore.currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0;
......@@ -2348,7 +2336,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
SyncTerm term = pNode->pRaftStore->currentTerm;
SyncTerm term = pNode->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
if (pEntry == NULL) return -1;
......@@ -2394,8 +2382,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry,
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL);
return -1;
......@@ -2468,7 +2455,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
SyncTerm term = ths->pRaftStore->currentTerm;
SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
if (pEntry == NULL) {
......@@ -2484,7 +2471,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
int32_t ret = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm;
SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
ASSERT(pEntry != NULL);
......@@ -2526,12 +2513,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
pMsgReply->destId = pMsg->srcId;
pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->term = ths->raftStore.currentTerm;
pMsgReply->privateTerm = 8864; // magic number
pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs;
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
......@@ -2560,7 +2547,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term);
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
......@@ -2687,7 +2674,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
int32_t code = 0;
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
SyncTerm term = ths->pRaftStore->currentTerm;
SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry = NULL;
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
......@@ -2721,7 +2708,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
int32_t code = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm;
SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry;
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
......@@ -2755,7 +2742,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
.state = ths->state,
.seqNum = pEntry->seqNum,
.term = pEntry->term,
.currentTerm = ths->pRaftStore->currentTerm,
.currentTerm = ths->raftStore.currentTerm,
.flag = 0,
ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
......@@ -2833,7 +2820,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
return 0;
if (pEntry->term < ths->pRaftStore->currentTerm) {
if (pEntry->term < ths->raftStore.currentTerm) {
sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
return 0;
......@@ -2871,7 +2858,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = {
.code = 0,
.currentTerm = ths->pRaftStore->currentTerm,
.currentTerm = ths->raftStore.currentTerm,
.flag = 0,
.index = pEntry->index,
.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
......@@ -2987,7 +2974,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
.state = ths->state,
.seqNum = pEntry->seqNum,
.term = pEntry->term,
.currentTerm = ths->pRaftStore->currentTerm,
.currentTerm = ths->raftStore.currentTerm,
.flag = flag,
......@@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEnt
pMsg->prevLogTerm = prevLogTerm;
pMsg->vgId = pNode->vgId;
pMsg->srcId = pNode->myRaftId;
pMsg->term = pNode->pRaftStore->currentTerm;
pMsg->term = pNode->raftStore.currentTerm;
pMsg->commitIndex = pNode->commitIndex;
pMsg->privateTerm = 0;
return 0;
......@@ -285,9 +285,9 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
int32_t ret = -1;
SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1;
int32_t ret = -1;
SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
SSyncRaftEntry* pExist = NULL;
bool inBuf = true;
......@@ -509,7 +509,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
SSyncLogStore* pLogStore = pNode->pLogStore;
SSyncFSM* pFsm = pNode->pFsm;
ESyncState role = pNode->state;
SyncTerm term = pNode->pRaftStore->currentTerm;
SyncTerm term = pNode->raftStore.currentTerm;
SyncGroupId vgId = pNode->vgId;
int32_t ret = -1;
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
......@@ -571,7 +571,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
// mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
pNode->pRaftStore->currentTerm <= pEntry->term) {
pNode->raftStore.currentTerm <= pEntry->term) {
pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
......@@ -614,9 +614,9 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
return -1;
int32_t ret = -1;
bool retried = false;
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
int32_t ret = -1;
bool retried = false;
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
int64_t nowMs = taosGetMonoTimestampMs();
int count = 0;
int64_t firstIndex = -1;
......@@ -807,9 +807,9 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
SyncTerm term = -1;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
......@@ -836,11 +836,11 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
int32_t count = 0;
int64_t nowMs = taosGetMonoTimestampMs();
int64_t limit = pMgr->size >> 1;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
int32_t count = 0;
int64_t nowMs = taosGetMonoTimestampMs();
int64_t limit = pMgr->size >> 1;
SyncTerm term = -1;
SyncIndex firstIndex = -1;
......@@ -891,13 +891,13 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true);
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
int64_t timeDiffMs = lastSentMs - firstSentMs;
if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
pMgr->retryBackoff -= 1;
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
int64_t timeDiffMs = lastSentMs - firstSentMs;
if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
pMgr->retryBackoff -= 1;
pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
......@@ -16,156 +16,161 @@
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "tjson.h"
// private function
static int32_t raftStoreInit(SRaftStore *pRaftStore);
static bool raftStoreFileExist(char *path);
static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
int32_t code = 0;
// public function
SRaftStore *raftStoreOpen(const char *path) {
int32_t ret;
tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
if (code < 0) return -1;
SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore));
if (pRaftStore == NULL) {
return NULL;
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore);
ASSERT(ret == 0);
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
ASSERT(pRaftStore->pFile != NULL);
int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
ASSERT(len > 0);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
ASSERT(ret == 0);
return pRaftStore;
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
ASSERT(pRaftStore != NULL);
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE);
ASSERT(pRaftStore->pFile != NULL);
pRaftStore->currentTerm = 0;
pRaftStore->voteFor.addr = 0;
pRaftStore->voteFor.vgId = 0;
int32_t ret = raftStorePersist(pRaftStore);
ASSERT(ret == 0);
return 0;
int32_t raftStoreClose(SRaftStore *pRaftStore) {
if (pRaftStore == NULL) return 0;
pRaftStore = NULL;
return 0;
int32_t raftStorePersist(SRaftStore *pRaftStore) {
ASSERT(pRaftStore != NULL);
int32_t ret;
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
ASSERT(ret == 0);
int32_t raftStoreReadFile(SSyncNode *pNode) {
int32_t code = -1;
TdFilePtr pFile = NULL;
char *pData = NULL;
SJson *pJson = NULL;
const char *file = pNode->raftStorePath;
SRaftStore *pStore = &pNode->raftStore;
if (taosStatFile(file, NULL, NULL) < 0) {
sInfo("vgId:%d, raft store file:%s not exist, use default value", pNode->vgId, file);
pStore->currentTerm = 0;
pStore->voteFor.addr = 0;
pStore->voteFor.vgId = 0;
return raftStoreWriteFile(pNode);
taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET);
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf));
int64_t size = 0;
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
return 0;
pData = taosMemoryMalloc(size + 1);
if (pData == NULL) {
goto _OVER;
static bool raftStoreFileExist(char *path) {
bool b = taosStatFile(path, NULL, NULL) >= 0;
return b;
if (taosReadFile(pFile, pData, size) != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
ASSERT(pRaftStore != NULL);
pData[size] = '\0';
cJSON *pRoot = cJSON_CreateObject();
pJson = tjsonParse(pData);
if (pJson == NULL) {
goto _OVER;
char u64Buf[128] = {0};
snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
if (raftStoreDecode(pJson, pStore) < 0) {
goto _OVER;
snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
code = 0;
sInfo("vgId:%d, succceed to read raft store file %s", pNode->vgId, file);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
if (pData != NULL) taosMemoryFree(pData);
if (pJson != NULL) cJSON_Delete(pJson);
if (pFile != NULL) taosCloseFile(&pFile);
char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized);
ASSERT(len2 < len);
memset(buf, 0, len);
snprintf(buf, len, "%s", serialized);
if (code != 0) {
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
return code;
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1;
return 0;
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
ASSERT(pRaftStore != NULL);
ASSERT(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
sscanf(pCurrentTerm->valuestring, "%" PRIu64 "", &(pRaftStore->currentTerm));
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
sscanf(pVoteForAddr->valuestring, "%" PRIu64 "", &(pRaftStore->voteFor.addr));
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
return 0;
int32_t raftStoreWriteFile(SSyncNode *pNode) {
int32_t code = -1;
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
const char *realfile = pNode->raftStorePath;
SRaftStore *pStore = &pNode->raftStore;
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s.bak", realfile);
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (raftStoreEncode(pJson, pStore) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) goto _OVER;
code = 0;
sInfo("vgId:%d, succeed to write raft store file:%s, len:%d", pNode->vgId, realfile, len);
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
return code;
bool raftStoreHasVoted(SRaftStore *pRaftStore) {
bool b = syncUtilEmptyId(&(pRaftStore->voteFor));
bool raftStoreHasVoted(SSyncNode *pNode) {
bool b = syncUtilEmptyId(&pNode->raftStore.voteFor);
return (!b);
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
pRaftStore->voteFor = *pRaftId;
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
pNode->raftStore.voteFor = *pRaftId;
void raftStoreClearVote(SRaftStore *pRaftStore) {
pRaftStore->voteFor = EMPTY_RAFT_ID;
void raftStoreClearVote(SSyncNode *pNode) {
pNode->raftStore.voteFor = EMPTY_RAFT_ID;
void raftStoreNextTerm(SRaftStore *pRaftStore) {
void raftStoreNextTerm(SSyncNode *pNode) {
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
pRaftStore->currentTerm = term;
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
pNode->raftStore.currentTerm = term;
......@@ -122,7 +122,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->term = pSyncNode->raftStore.currentTerm;
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
......@@ -245,7 +245,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pSyncNode->peersId[i];
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->term = pSyncNode->raftStore.currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0;
......@@ -44,12 +44,12 @@
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(ths);
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
if (pMsg->lastLogIndex < pSyncNode->commitIndex) {
if (pMsg->lastLogIndex < ths->commitIndex) {
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
......@@ -58,7 +58,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
if (myLastTerm == SYNC_TERM_INVALID) {
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
......@@ -66,7 +66,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
if (pMsg->lastLogTerm > myLastTerm) {
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
......@@ -74,14 +74,14 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
return true;
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
......@@ -93,7 +93,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncRequestVote* pMsg = pRpcMsg->pCont;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
return -1;
......@@ -101,21 +101,21 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->raftStore.currentTerm) {
syncNodeStepDown(ths, pMsg->term);
// syncNodeUpdateTerm(ths, pMsg->term);
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
ASSERT(pMsg->term <= ths->raftStore.currentTerm);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
bool grant = (pMsg->term == ths->raftStore.currentTerm) && logOK &&
((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
if (grant) {
// maybe has already voted for pMsg->srcId
// vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
raftStoreVote(ths, &(pMsg->srcId));
// candidate ?
syncNodeStepDown(ths, ths->pRaftStore->currentTerm);
syncNodeStepDown(ths, ths->raftStore.currentTerm);
// forbid elect for this round
......@@ -129,7 +129,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pReply = rpcMsg.pCont;
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->term = ths->raftStore.currentTerm;
pReply->voteGranted = grant;
// trace log
......@@ -49,25 +49,25 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
return -1;
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
// ASSERT(!(pMsg->term > ths->raftStore.currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// if (pMsg->term > ths->raftStore.currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
ASSERT(pMsg->term == ths->raftStore.currentTerm);
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
......@@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
.state = pNode->state,
.seqNum = *pSeqNum,
.currentTerm = pNode->pRaftStore->currentTerm,
.currentTerm = pNode->raftStore.currentTerm,
.flag = 0,
......@@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->term = pSyncNode->raftStore.currentTerm;
pSender->startTime = 0;
pSender->endTime = 0;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
......@@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->term = pSender->pSyncNode->raftStore.currentTerm;
pSender->startTime = taosGetTimestampMs();
pSender->lastSendTime = pSender->startTime;
pSender->finish = false;
......@@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
......@@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
......@@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
......@@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->pWriter = NULL;
pReceiver->pSyncNode = pSyncNode;
pReceiver->fromId = fromId;
pReceiver->term = pSyncNode->pRaftStore->currentTerm;
pReceiver->term = pSyncNode->raftStore.currentTerm;
pReceiver->snapshot.data = NULL;
pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
pReceiver->snapshot.lastApplyTerm = 0;
......@@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
pReceiver->start = true;
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm;
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
......@@ -437,9 +437,9 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
// maybe update term
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) {
pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm;
// stop writer, apply data
......@@ -592,7 +592,7 @@ _SEND_REPLY:
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
......@@ -648,7 +648,7 @@ _SEND_REPLY:
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
......@@ -698,7 +698,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
......@@ -745,7 +745,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
......@@ -794,13 +794,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return -1;
if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
if (pMsg->term < pSyncNode->raftStore.currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
return -1;
if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
if (pMsg->term > pSyncNode->raftStore.currentTerm) {
syncNodeStepDown(pSyncNode, pMsg->term);
......@@ -808,7 +808,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// state, term, seq/ack
int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->term == pSyncNode->raftStore.currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
......@@ -892,7 +892,7 @@ static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSende
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
......@@ -951,10 +951,10 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
goto _ERROR;
if (pMsg->term != pSyncNode->pRaftStore->currentTerm) {
if (pMsg->term != pSyncNode->raftStore.currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
goto _ERROR;
......@@ -158,8 +158,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm;
if (pNode == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->raftStore.currentTerm;
// save error code, otherwise it will be overwritten
int32_t errCode = terrno;
......@@ -228,7 +228,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...) {
SSyncNode* pNode = pSender->pSyncNode;
if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
if (pNode == NULL || pNode->pLogStore == NULL) return;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
......@@ -264,7 +264,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
DID(&pNode->replicasId[pSender->replicaIndex]), pNode->pRaftStore->currentTerm, pNode->commitIndex,
DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
......@@ -274,7 +274,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...) {
SSyncNode* pNode = pReceiver->pSyncNode;
if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
if (pNode == NULL || pNode->pLogStore == NULL) return;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
......@@ -311,7 +311,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack,
pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start,
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex,
pReceiver->snapshot.lastConfigIndex, pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
......@@ -80,7 +80,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state));
cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(&pSyncNode.raftStore));
// tla+ candidate vars
cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
......@@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
", sby:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d",
pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex,
logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum,
pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
......@@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
pMsgReply->srcId = ths->myRaftId;
pMsgReply->destId = pMsg->srcId;
pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->term = ths->raftStore.currentTerm;
SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册