未验证 提交 1cc5f593 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19445 from taosdata/fix/TD-21644

enh: refact raft store file read write
...@@ -32,11 +32,9 @@ typedef struct SyncRequestVoteReply SyncRequestVoteReply; ...@@ -32,11 +32,9 @@ typedef struct SyncRequestVoteReply SyncRequestVoteReply;
typedef struct SyncAppendEntries SyncAppendEntries; typedef struct SyncAppendEntries SyncAppendEntries;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
typedef struct SSyncEnv SSyncEnv; typedef struct SSyncEnv SSyncEnv;
typedef struct SRaftStore SRaftStore;
typedef struct SVotesGranted SVotesGranted; typedef struct SVotesGranted SVotesGranted;
typedef struct SVotesRespond SVotesRespond; typedef struct SVotesRespond SVotesRespond;
typedef struct SSyncIndexMgr SSyncIndexMgr; typedef struct SSyncIndexMgr SSyncIndexMgr;
typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
...@@ -70,6 +68,11 @@ typedef struct SRaftId { ...@@ -70,6 +68,11 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
typedef struct SRaftStore {
SyncTerm currentTerm;
SRaftId voteFor;
} SRaftStore;
typedef struct SSyncHbTimerData { typedef struct SSyncHbTimerData {
int64_t syncNodeRid; int64_t syncNodeRid;
SSyncTimer* pTimer; SSyncTimer* pTimer;
...@@ -112,8 +115,8 @@ typedef struct SSyncNode { ...@@ -112,8 +115,8 @@ typedef struct SSyncNode {
// sync io // sync io
SSyncLogBuffer* pLogBuf; SSyncLogBuffer* pLogBuf;
SWal* pWal; SWal* pWal;
const SMsgCb* msgcb; const SMsgCb* msgcb;
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
...@@ -139,8 +142,8 @@ typedef struct SSyncNode { ...@@ -139,8 +142,8 @@ typedef struct SSyncNode {
int64_t rid; int64_t rid;
// tla+ server vars // tla+ server vars
ESyncState state; ESyncState state;
SRaftStore* pRaftStore; SRaftStore raftStore;
// tla+ candidate vars // tla+ candidate vars
SVotesGranted* pVotesGranted; SVotesGranted* pVotesGranted;
...@@ -229,7 +232,7 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); ...@@ -229,7 +232,7 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode);
void syncNodePostClose(SSyncNode* pSyncNode); void syncNodePostClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncNodeRestore(SSyncNode* pSyncNode); int32_t syncNodeRestore(SSyncNode* pSyncNode);
void syncHbTimerDataFree(SSyncHbTimerData* pData); void syncHbTimerDataFree(SSyncHbTimerData* pData);
......
...@@ -24,25 +24,16 @@ extern "C" { ...@@ -24,25 +24,16 @@ extern "C" {
#define RAFT_STORE_BLOCK_SIZE 512 #define RAFT_STORE_BLOCK_SIZE 512
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2) #define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) int32_t raftStoreReadFile(SSyncNode *pNode);
int32_t raftStoreWriteFile(SSyncNode *pNode);
typedef struct SRaftStore { bool raftStoreHasVoted(SSyncNode *pNode);
SyncTerm currentTerm; void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId);
SRaftId voteFor; void raftStoreClearVote(SSyncNode *pNode);
TdFilePtr pFile; void raftStoreNextTerm(SSyncNode *pNode);
char path[RAFT_STORE_PATH_LEN]; void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term);
} SRaftStore;
SRaftStore *raftStoreOpen(const char *path);
int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
bool raftStoreHasVoted(SRaftStore *pRaftStore);
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -159,17 +159,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -159,17 +159,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// prepare response msg // prepare response msg
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->raftStore.currentTerm;
pReply->success = false; pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->startTime = ths->startTime; pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->raftStore.currentTerm) {
goto _SEND_RESPONSE; goto _SEND_RESPONSE;
} }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->raftStore.currentTerm) {
pReply->term = pMsg->term; pReply->term = pMsg->term;
} }
...@@ -253,19 +253,19 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -253,19 +253,19 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pReply = rpcRsp.pCont; SyncAppendEntriesReply* pReply = rpcRsp.pCont;
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->raftStore.currentTerm;
pReply->success = false; pReply->success = false;
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); // pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->startTime = ths->startTime; pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, small term"); syncLogRecvAppendEntries(ths, pMsg, "reject, small term");
goto _SEND_RESPONSE; goto _SEND_RESPONSE;
} }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->raftStore.currentTerm) {
pReply->term = pMsg->term; pReply->term = pMsg->term;
} }
......
...@@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0; return 0;
} }
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
return -1; return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->raftStore.currentTerm);
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
...@@ -100,19 +100,19 @@ int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* ...@@ -100,19 +100,19 @@ int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply*
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0; return 0;
} }
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
return -1; return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->raftStore.currentTerm);
if (pMsg->success) { if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
......
...@@ -133,7 +133,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -133,7 +133,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} }
} }
// cannot commit, even if quorum agree. need check term! // cannot commit, even if quorum agree. need check term!
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { if (pEntry->term <= pSyncNode->raftStore.currentTerm) {
// update commit index // update commit index
newCommitIndex = index; newCommitIndex = index;
...@@ -329,7 +329,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { ...@@ -329,7 +329,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
SyncIndex commitIndex = indexLikely; SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex); syncNodeUpdateCommitIndex(ths, commitIndex);
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex); ths->raftStore.currentTerm, commitIndex);
} }
return ths->commitIndex; return ths->commitIndex;
} }
...@@ -48,7 +48,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { ...@@ -48,7 +48,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
SyncRequestVote* pMsg = rpcMsg.pCont; SyncRequestVote* pMsg = rpcMsg.pCont;
pMsg->srcId = pNode->myRaftId; pMsg->srcId = pNode->myRaftId;
pMsg->destId = pNode->peersId[i]; pMsg->destId = pNode->peersId[i];
pMsg->term = pNode->pRaftStore->currentTerm; pMsg->term = pNode->raftStore.currentTerm;
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
ASSERT(ret == 0); ASSERT(ret == 0);
...@@ -75,10 +75,10 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -75,10 +75,10 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
} }
// start election // start election
raftStoreNextTerm(pSyncNode->pRaftStore); raftStoreNextTerm(pSyncNode);
raftStoreClearVote(pSyncNode->pRaftStore); raftStoreClearVote(pSyncNode);
voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->pRaftStore->currentTerm); voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->raftStore.currentTerm);
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm); votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->raftStore.currentTerm);
syncNodeVoteForSelf(pSyncNode); syncNodeVoteForSelf(pSyncNode);
if (voteGrantedMajority(pSyncNode->pVotesGranted)) { if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
......
...@@ -468,7 +468,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { ...@@ -468,7 +468,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
} }
if (code == 0 && pEntry != NULL) { if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) {
ready = true; ready = true;
} }
...@@ -736,7 +736,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ ...@@ -736,7 +736,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex); int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
if (code == 0) { if (code == 0) {
pMsg->info.conn.applyIndex = retIndex; pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm;
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
return 1; return 1;
...@@ -983,8 +983,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -983,8 +983,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init TLA+ server vars // init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); if (raftStoreReadFile(pSyncNode) != 0) {
if (pSyncNode->pRaftStore == NULL) {
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
goto _error; goto _error;
} }
...@@ -1184,7 +1183,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { ...@@ -1184,7 +1183,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
int32_t syncNodeStart(SSyncNode* pSyncNode) { int32_t syncNodeStart(SSyncNode* pSyncNode) {
// start raft // start raft
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode->pRaftStore); raftStoreNextTerm(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start"); syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
...@@ -1202,7 +1201,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { ...@@ -1202,7 +1201,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
void syncNodeStartOld(SSyncNode* pSyncNode) { void syncNodeStartOld(SSyncNode* pSyncNode) {
// start raft // start raft
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode->pRaftStore); raftStoreNextTerm(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start"); syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
...@@ -1288,10 +1287,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1288,10 +1287,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0);
pSyncNode->pRaftStore = NULL;
syncNodeLogReplMgrDestroy(pSyncNode); syncNodeLogReplMgrDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL; pSyncNode->pSyncRespMgr = NULL;
...@@ -1714,39 +1709,39 @@ _END: ...@@ -1714,39 +1709,39 @@ _END:
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) { if (term > pSyncNode->raftStore.currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term); raftStoreSetTerm(pSyncNode, term);
char tmpBuf[64]; char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
syncNodeBecomeFollower(pSyncNode, tmpBuf); syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode->pRaftStore); raftStoreClearVote(pSyncNode);
} }
} }
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) { if (term > pSyncNode->raftStore.currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term); raftStoreSetTerm(pSyncNode, term);
} }
} }
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
if (pSyncNode->pRaftStore->currentTerm > newTerm) { if (pSyncNode->raftStore.currentTerm > newTerm) {
sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
pSyncNode->pRaftStore->currentTerm); pSyncNode->raftStore.currentTerm);
return; return;
} }
do { do {
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
pSyncNode->pRaftStore->currentTerm); pSyncNode->raftStore.currentTerm);
} while (0); } while (0);
if (pSyncNode->pRaftStore->currentTerm < newTerm) { if (pSyncNode->raftStore.currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, newTerm); raftStoreSetTerm(pSyncNode, newTerm);
char tmpBuf[64]; char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf); syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode->pRaftStore); raftStoreClearVote(pSyncNode);
} else { } else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
...@@ -1904,7 +1899,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { ...@@ -1904,7 +1899,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
ASSERT(lastIndex >= 0); ASSERT(lastIndex >= 0);
sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "", sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
} }
void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) { void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
...@@ -1937,7 +1932,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ...@@ -1937,7 +1932,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "follower to candidate"); sNTrace(pSyncNode, "follower to candidate");
} }
...@@ -1947,7 +1942,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) { ...@@ -1947,7 +1942,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeBecomeFollower(pSyncNode, "leader to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "leader to follower"); sNTrace(pSyncNode, "leader to follower");
} }
...@@ -1957,7 +1952,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ...@@ -1957,7 +1952,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "candidate to follower"); syncNodeBecomeFollower(pSyncNode, "candidate to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "candidate to follower"); sNTrace(pSyncNode, "candidate to follower");
} }
...@@ -1965,15 +1960,15 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ...@@ -1965,15 +1960,15 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
// just called by syncNodeVoteForSelf // just called by syncNodeVoteForSelf
// need assert // need assert
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
ASSERT(term == pSyncNode->pRaftStore->currentTerm); ASSERT(term == pSyncNode->raftStore.currentTerm);
ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore)); ASSERT(!raftStoreHasVoted(pSyncNode));
raftStoreVote(pSyncNode->pRaftStore, pRaftId); raftStoreVote(pSyncNode, pRaftId);
} }
// simulate get vote from outside // simulate get vote from outside
void syncNodeVoteForSelf(SSyncNode* pSyncNode) { void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId); syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId); int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
...@@ -1982,7 +1977,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { ...@@ -1982,7 +1977,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
SyncRequestVoteReply* pMsg = rpcMsg.pCont; SyncRequestVoteReply* pMsg = rpcMsg.pCont;
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->raftStore.currentTerm;
pMsg->voteGranted = true; pMsg->voteGranted = true;
voteGrantedVote(pSyncNode->pVotesGranted, pMsg); voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
...@@ -2272,13 +2267,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2272,13 +2267,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
if (pSyncNode->pRaftStore == NULL) {
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);
sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
return;
}
// sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId); // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {
...@@ -2302,7 +2290,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2302,7 +2290,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SyncHeartbeat* pSyncMsg = rpcMsg.pCont; SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId; pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; pSyncMsg->term = pSyncNode->raftStore.currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
...@@ -2348,7 +2336,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { ...@@ -2348,7 +2336,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
} }
SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore); SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
SyncTerm term = pNode->pRaftStore->currentTerm; SyncTerm term = pNode->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
if (pEntry == NULL) return -1; if (pEntry == NULL) return -1;
...@@ -2394,8 +2382,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { ...@@ -2394,8 +2382,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL; terrno = TSDB_CODE_SYN_BUFFER_FULL;
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry, (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL);
TSDB_CODE_SYN_BUFFER_FULL);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; return -1;
} }
...@@ -2468,7 +2455,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) { ...@@ -2468,7 +2455,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
static int32_t syncNodeAppendNoop(SSyncNode* ths) { static int32_t syncNodeAppendNoop(SSyncNode* ths) {
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
SyncTerm term = ths->pRaftStore->currentTerm; SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
if (pEntry == NULL) { if (pEntry == NULL) {
...@@ -2484,7 +2471,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { ...@@ -2484,7 +2471,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
int32_t ret = 0; int32_t ret = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm; SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
...@@ -2526,12 +2513,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2526,12 +2513,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
pMsgReply->destId = pMsg->srcId; pMsgReply->destId = pMsg->srcId;
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->term = ths->raftStore.currentTerm;
pMsgReply->privateTerm = 8864; // magic number pMsgReply->privateTerm = 8864; // magic number
pMsgReply->startTime = ths->startTime; pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs; pMsgReply->timeStamp = tsMs;
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
...@@ -2560,7 +2547,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2560,7 +2547,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
} }
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term); // syncNodeStepDown(ths, pMsg->term);
SRpcMsg rpcMsgLocalCmd = {0}; SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
...@@ -2687,7 +2674,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn ...@@ -2687,7 +2674,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
int32_t code = 0; int32_t code = 0;
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
SyncTerm term = ths->pRaftStore->currentTerm; SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry = NULL; SSyncRaftEntry* pEntry = NULL;
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index); pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
...@@ -2721,7 +2708,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe ...@@ -2721,7 +2708,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
int32_t code = 0; int32_t code = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm; SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry;
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
...@@ -2755,7 +2742,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe ...@@ -2755,7 +2742,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
.state = ths->state, .state = ths->state,
.seqNum = pEntry->seqNum, .seqNum = pEntry->seqNum,
.term = pEntry->term, .term = pEntry->term,
.currentTerm = ths->pRaftStore->currentTerm, .currentTerm = ths->raftStore.currentTerm,
.flag = 0, .flag = 0,
}; };
ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta); ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
...@@ -2833,7 +2820,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p ...@@ -2833,7 +2820,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
return 0; return 0;
} }
if (pEntry->term < ths->pRaftStore->currentTerm) { if (pEntry->term < ths->raftStore.currentTerm) {
sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term); sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
return 0; return 0;
} }
...@@ -2871,7 +2858,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p ...@@ -2871,7 +2858,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
if (ths->pFsm->FpLeaderTransferCb != NULL) { if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = { SFsmCbMeta cbMeta = {
.code = 0, .code = 0,
.currentTerm = ths->pRaftStore->currentTerm, .currentTerm = ths->raftStore.currentTerm,
.flag = 0, .flag = 0,
.index = pEntry->index, .index = pEntry->index,
.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
...@@ -2987,7 +2974,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde ...@@ -2987,7 +2974,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
.state = ths->state, .state = ths->state,
.seqNum = pEntry->seqNum, .seqNum = pEntry->seqNum,
.term = pEntry->term, .term = pEntry->term,
.currentTerm = ths->pRaftStore->currentTerm, .currentTerm = ths->raftStore.currentTerm,
.flag = flag, .flag = flag,
}; };
......
...@@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEnt ...@@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEnt
pMsg->prevLogTerm = prevLogTerm; pMsg->prevLogTerm = prevLogTerm;
pMsg->vgId = pNode->vgId; pMsg->vgId = pNode->vgId;
pMsg->srcId = pNode->myRaftId; pMsg->srcId = pNode->myRaftId;
pMsg->term = pNode->pRaftStore->currentTerm; pMsg->term = pNode->raftStore.currentTerm;
pMsg->commitIndex = pNode->commitIndex; pMsg->commitIndex = pNode->commitIndex;
pMsg->privateTerm = 0; pMsg->privateTerm = 0;
return 0; return 0;
......
...@@ -285,9 +285,9 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { ...@@ -285,9 +285,9 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
int32_t ret = -1; int32_t ret = -1;
SyncIndex index = pEntry->index; SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1; SyncIndex prevIndex = pEntry->index - 1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf); SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
SSyncRaftEntry* pExist = NULL; SSyncRaftEntry* pExist = NULL;
bool inBuf = true; bool inBuf = true;
...@@ -509,7 +509,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -509,7 +509,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
SSyncLogStore* pLogStore = pNode->pLogStore; SSyncLogStore* pLogStore = pNode->pLogStore;
SSyncFSM* pFsm = pNode->pFsm; SSyncFSM* pFsm = pNode->pFsm;
ESyncState role = pNode->state; ESyncState role = pNode->state;
SyncTerm term = pNode->pRaftStore->currentTerm; SyncTerm term = pNode->raftStore.currentTerm;
SyncGroupId vgId = pNode->vgId; SyncGroupId vgId = pNode->vgId;
int32_t ret = -1; int32_t ret = -1;
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
...@@ -571,7 +571,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -571,7 +571,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
_out: _out:
// mark as restored if needed // mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
pNode->pRaftStore->currentTerm <= pEntry->term) { pNode->raftStore.currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
pNode->restoreFinish = true; pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
...@@ -614,9 +614,9 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -614,9 +614,9 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
return -1; return -1;
} }
int32_t ret = -1; int32_t ret = -1;
bool retried = false; bool retried = false;
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
int count = 0; int count = 0;
int64_t firstIndex = -1; int64_t firstIndex = -1;
...@@ -807,9 +807,9 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode ...@@ -807,9 +807,9 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
} }
(void)syncLogReplMgrReset(pMgr); (void)syncLogReplMgrReset(pMgr);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false; bool barrier = false;
SyncTerm term = -1; SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr); terrstr(), index, pDestId->addr);
...@@ -836,11 +836,11 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode ...@@ -836,11 +836,11 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored); ASSERT(pMgr->restored);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff)); int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
int32_t count = 0; int32_t count = 0;
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
int64_t limit = pMgr->size >> 1; int64_t limit = pMgr->size >> 1;
SyncTerm term = -1; SyncTerm term = -1;
SyncIndex firstIndex = -1; SyncIndex firstIndex = -1;
...@@ -891,13 +891,13 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -891,13 +891,13 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true); ASSERT(pMgr->restored == true);
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs; int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs; int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
int64_t timeDiffMs = lastSentMs - firstSentMs; int64_t timeDiffMs = lastSentMs - firstSentMs;
if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) { if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
pMgr->retryBackoff -= 1; pMgr->retryBackoff -= 1;
} }
} }
pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex); pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
......
...@@ -16,158 +16,161 @@ ...@@ -16,158 +16,161 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "tjson.h"
// private function static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
static int32_t raftStoreInit(SRaftStore *pRaftStore); int32_t code = 0;
static bool raftStoreFileExist(char *path);
static int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
static int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
// public function tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
SRaftStore *raftStoreOpen(const char *path) { if (code < 0) return -1;
int32_t ret; tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
if (code < 0) return -1;
SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore));
if (pRaftStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore);
ASSERT(ret == 0);
}
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
ASSERT(pRaftStore->pFile != NULL);
int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
ASSERT(len > 0);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
ASSERT(ret == 0);
return pRaftStore;
}
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
ASSERT(pRaftStore != NULL);
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE);
ASSERT(pRaftStore->pFile != NULL);
pRaftStore->currentTerm = 0;
pRaftStore->voteFor.addr = 0;
pRaftStore->voteFor.vgId = 0;
int32_t ret = raftStorePersist(pRaftStore);
ASSERT(ret == 0);
taosCloseFile(&pRaftStore->pFile);
return 0;
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
if (pRaftStore == NULL) return 0;
taosCloseFile(&pRaftStore->pFile);
taosMemoryFree(pRaftStore);
pRaftStore = NULL;
return 0; return 0;
} }
int32_t raftStorePersist(SRaftStore *pRaftStore) { int32_t raftStoreReadFile(SSyncNode *pNode) {
ASSERT(pRaftStore != NULL); int32_t code = -1;
TdFilePtr pFile = NULL;
int32_t ret; char *pData = NULL;
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; SJson *pJson = NULL;
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); const char *file = pNode->raftStorePath;
ASSERT(ret == 0); SRaftStore *pStore = &pNode->raftStore;
if (taosStatFile(file, NULL, NULL) < 0) {
sInfo("vgId:%d, raft store file:%s not exist, use default value", pNode->vgId, file);
pStore->currentTerm = 0;
pStore->voteFor.addr = 0;
pStore->voteFor.vgId = 0;
return raftStoreWriteFile(pNode);
}
taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET); pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
}
ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf)); int64_t size = 0;
ASSERT(ret == RAFT_STORE_BLOCK_SIZE); if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
}
taosFsyncFile(pRaftStore->pFile); pData = taosMemoryMalloc(size + 1);
return 0; if (pData == NULL) {
} terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
static bool raftStoreFileExist(char *path) { if (taosReadFile(pFile, pData, size) != size) {
bool b = taosStatFile(path, NULL, NULL) >= 0; terrno = TAOS_SYSTEM_ERROR(errno);
return b; sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
} goto _OVER;
}
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { pData[size] = '\0';
ASSERT(pRaftStore != NULL);
cJSON *pRoot = cJSON_CreateObject(); pJson = tjsonParse(pData);
if (pJson == NULL) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
char u64Buf[128] = {0}; if (raftStoreDecode(pJson, pStore) < 0) {
snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->currentTerm); terrno = TSDB_CODE_INVALID_JSON_FORMAT;
cJSON_AddStringToObject(pRoot, "current_term", u64Buf); goto _OVER;
}
snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->voteFor.addr); code = 0;
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf); sInfo("vgId:%d, succceed to read raft store file %s", pNode->vgId, file);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); _OVER:
if (pData != NULL) taosMemoryFree(pData);
if (pJson != NULL) cJSON_Delete(pJson);
if (pFile != NULL) taosCloseFile(&pFile);
char *serialized = cJSON_Print(pRoot); if (code != 0) {
int len2 = strlen(serialized); sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
ASSERT(len2 < len); }
memset(buf, 0, len); return code;
snprintf(buf, len, "%s", serialized); }
taosMemoryFree(serialized);
cJSON_Delete(pRoot); static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1;
return 0; return 0;
} }
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { int32_t raftStoreWriteFile(SSyncNode *pNode) {
ASSERT(pRaftStore != NULL); int32_t code = -1;
char *buffer = NULL;
ASSERT(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); SJson *pJson = NULL;
cJSON *pRoot = cJSON_Parse(buf); TdFilePtr pFile = NULL;
const char *realfile = pNode->raftStorePath;
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); SRaftStore *pStore = &pNode->raftStore;
ASSERT(cJSON_IsString(pCurrentTerm)); char file[PATH_MAX] = {0};
sscanf(pCurrentTerm->valuestring, "%" PRIu64 "", &(pRaftStore->currentTerm)); snprintf(file, sizeof(file), "%s.bak", realfile);
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); terrno = TSDB_CODE_OUT_OF_MEMORY;
ASSERT(cJSON_IsString(pVoteForAddr)); pJson = tjsonCreateObject();
sscanf(pVoteForAddr->valuestring, "%" PRIu64 "", &(pRaftStore->voteFor.addr)); if (pJson == NULL) goto _OVER;
if (raftStoreEncode(pJson, pStore) != 0) goto _OVER;
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); buffer = tjsonToString(pJson);
pRaftStore->voteFor.vgId = pVoteForVgid->valueint; if (buffer == NULL) goto _OVER;
terrno = 0;
cJSON_Delete(pRoot);
return 0; pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
code = 0;
sInfo("vgId:%d, succeed to write raft store file:%s, len:%d", pNode->vgId, realfile, len);
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
}
return code;
} }
bool raftStoreHasVoted(SRaftStore *pRaftStore) { bool raftStoreHasVoted(SSyncNode *pNode) {
bool b = syncUtilEmptyId(&(pRaftStore->voteFor)); bool b = syncUtilEmptyId(&pNode->raftStore.voteFor);
return (!b); return (!b);
} }
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
ASSERT(!syncUtilEmptyId(pRaftId)); pNode->raftStore.voteFor = *pRaftId;
pRaftStore->voteFor = *pRaftId; (void)raftStoreWriteFile(pNode);
raftStorePersist(pRaftStore);
} }
void raftStoreClearVote(SRaftStore *pRaftStore) { void raftStoreClearVote(SSyncNode *pNode) {
pRaftStore->voteFor = EMPTY_RAFT_ID; pNode->raftStore.voteFor = EMPTY_RAFT_ID;
raftStorePersist(pRaftStore); (void)raftStoreWriteFile(pNode);
} }
void raftStoreNextTerm(SRaftStore *pRaftStore) { void raftStoreNextTerm(SSyncNode *pNode) {
++(pRaftStore->currentTerm); pNode->raftStore.currentTerm++;
raftStorePersist(pRaftStore); (void)raftStoreWriteFile(pNode);
} }
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) { void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
pRaftStore->currentTerm = term; pNode->raftStore.currentTerm = term;
raftStorePersist(pRaftStore); (void)raftStoreWriteFile(pNode);
} }
...@@ -122,7 +122,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh ...@@ -122,7 +122,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId; pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->raftStore.currentTerm;
pMsg->prevLogIndex = preLogIndex; pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm; pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex; pMsg->commitIndex = pSyncNode->commitIndex;
...@@ -245,7 +245,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { ...@@ -245,7 +245,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
SyncHeartbeat* pSyncMsg = rpcMsg.pCont; SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pSyncNode->peersId[i]; pSyncMsg->destId = pSyncNode->peersId[i];
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; pSyncMsg->term = pSyncNode->raftStore.currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
......
...@@ -44,12 +44,12 @@ ...@@ -44,12 +44,12 @@
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
// //
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm myLastTerm = syncNodeGetLastTerm(ths);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
if (pMsg->lastLogIndex < pSyncNode->commitIndex) { if (pMsg->lastLogIndex < ths->commitIndex) {
sNTrace(pSyncNode, sNTrace(ths,
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}", ", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
...@@ -58,7 +58,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM ...@@ -58,7 +58,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
} }
if (myLastTerm == SYNC_TERM_INVALID) { if (myLastTerm == SYNC_TERM_INVALID) {
sNTrace(pSyncNode, sNTrace(ths,
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}", ", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
...@@ -66,7 +66,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM ...@@ -66,7 +66,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
} }
if (pMsg->lastLogTerm > myLastTerm) { if (pMsg->lastLogTerm > myLastTerm) {
sNTrace(pSyncNode, sNTrace(ths,
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}", ", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
...@@ -74,14 +74,14 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM ...@@ -74,14 +74,14 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
} }
if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) { if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
sNTrace(pSyncNode, sNTrace(ths,
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}", ", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
return true; return true;
} }
sNTrace(pSyncNode, sNTrace(ths,
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}", ", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
...@@ -93,7 +93,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -93,7 +93,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncRequestVote* pMsg = pRpcMsg->pCont; SyncRequestVote* pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config"); syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
return -1; return -1;
} }
...@@ -101,21 +101,21 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -101,21 +101,21 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
// maybe update term // maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->raftStore.currentTerm) {
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
// syncNodeUpdateTerm(ths, pMsg->term); // syncNodeUpdateTerm(ths, pMsg->term);
} }
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); ASSERT(pMsg->term <= ths->raftStore.currentTerm);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && bool grant = (pMsg->term == ths->raftStore.currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
if (grant) { if (grant) {
// maybe has already voted for pMsg->srcId // maybe has already voted for pMsg->srcId
// vote again, no harm // vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); raftStoreVote(ths, &(pMsg->srcId));
// candidate ? // candidate ?
syncNodeStepDown(ths, ths->pRaftStore->currentTerm); syncNodeStepDown(ths, ths->raftStore.currentTerm);
// forbid elect for this round // forbid elect for this round
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
...@@ -129,7 +129,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -129,7 +129,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pReply = rpcMsg.pCont; SyncRequestVoteReply* pReply = rpcMsg.pCont;
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->raftStore.currentTerm;
pReply->voteGranted = grant; pReply->voteGranted = grant;
// trace log // trace log
......
...@@ -49,25 +49,25 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -49,25 +49,25 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
return -1; return -1;
} }
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); // ASSERT(!(pMsg->term > ths->raftStore.currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term. // no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) { // if (pMsg->term > ths->raftStore.currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term); // syncNodeUpdateTerm(ths, pMsg->term);
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncLogRecvRequestVoteReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
return -1; return -1;
} }
syncLogRecvRequestVoteReply(ths, pMsg, ""); syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->raftStore.currentTerm);
// This tallies votes even when the current state is not Candidate, // This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter. // but they won't be looked at, so it doesn't matter.
......
...@@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { ...@@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
.state = pNode->state, .state = pNode->state,
.seqNum = *pSeqNum, .seqNum = *pSeqNum,
.term = SYNC_TERM_INVALID, .term = SYNC_TERM_INVALID,
.currentTerm = pNode->pRaftStore->currentTerm, .currentTerm = pNode->raftStore.currentTerm,
.flag = 0, .flag = 0,
}; };
......
...@@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->pSyncNode = pSyncNode; pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex; pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->term = pSyncNode->raftStore.currentTerm;
pSender->startTime = 0; pSender->startTime = 0;
pSender->endTime = 0; pSender->endTime = 0;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
...@@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0; pSender->sendingMS = 0;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->term = pSender->pSyncNode->raftStore.currentTerm;
pSender->startTime = taosGetTimestampMs(); pSender->startTime = taosGetTimestampMs();
pSender->lastSendTime = pSender->startTime; pSender->lastSendTime = pSender->startTime;
pSender->finish = false; pSender->finish = false;
...@@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from ...@@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
pReceiver->pSyncNode = pSyncNode; pReceiver->pSyncNode = pSyncNode;
pReceiver->fromId = fromId; pReceiver->fromId = fromId;
pReceiver->term = pSyncNode->pRaftStore->currentTerm; pReceiver->term = pSyncNode->raftStore.currentTerm;
pReceiver->snapshot.data = NULL; pReceiver->snapshot.data = NULL;
pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastApplyTerm = 0;
...@@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p ...@@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
pReceiver->start = true; pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm;
pReceiver->fromId = pPreMsg->srcId; pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
...@@ -437,9 +437,9 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap ...@@ -437,9 +437,9 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
// maybe update term // maybe update term
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) { if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) {
pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm; pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm;
raftStorePersist(pReceiver->pSyncNode->pRaftStore); (void)raftStoreWriteFile(pReceiver->pSyncNode);
} }
// stop writer, apply data // stop writer, apply data
...@@ -592,7 +592,7 @@ _SEND_REPLY: ...@@ -592,7 +592,7 @@ _SEND_REPLY:
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -648,7 +648,7 @@ _SEND_REPLY: ...@@ -648,7 +648,7 @@ _SEND_REPLY:
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -698,7 +698,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend ...@@ -698,7 +698,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -745,7 +745,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs ...@@ -745,7 +745,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->term = pSyncNode->raftStore.currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -794,13 +794,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -794,13 +794,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return -1; return -1;
} }
if (pMsg->term < pSyncNode->pRaftStore->currentTerm) { if (pMsg->term < pSyncNode->raftStore.currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
if (pMsg->term > pSyncNode->pRaftStore->currentTerm) { if (pMsg->term > pSyncNode->raftStore.currentTerm) {
syncNodeStepDown(pSyncNode, pMsg->term); syncNodeStepDown(pSyncNode, pMsg->term);
} }
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
...@@ -808,7 +808,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -808,7 +808,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// state, term, seq/ack // state, term, seq/ack
int32_t code = 0; int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->raftStore.currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
code = syncNodeOnSnapshotPre(pSyncNode, pMsg); code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
...@@ -892,7 +892,7 @@ static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSende ...@@ -892,7 +892,7 @@ static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSende
SyncSnapshotSend *pSendMsg = rpcMsg.pCont; SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm;
pSendMsg->beginIndex = pSender->snapshotParam.start; pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -951,10 +951,10 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -951,10 +951,10 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
goto _ERROR; goto _ERROR;
} }
if (pMsg->term != pSyncNode->pRaftStore->currentTerm) { if (pMsg->term != pSyncNode->raftStore.currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
pSyncNode->pRaftStore->currentTerm); pSyncNode->raftStore.currentTerm);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR; goto _ERROR;
} }
......
...@@ -158,8 +158,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { ...@@ -158,8 +158,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
} }
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm; int64_t currentTerm = pNode->raftStore.currentTerm;
// save error code, otherwise it will be overwritten // save error code, otherwise it will be overwritten
int32_t errCode = terrno; int32_t errCode = terrno;
...@@ -228,7 +228,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -228,7 +228,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...) { const char* format, ...) {
SSyncNode* pNode = pSender->pSyncNode; SSyncNode* pNode = pSender->pSyncNode;
if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pLogStore == NULL) return;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
...@@ -264,7 +264,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla ...@@ -264,7 +264,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
DID(&pNode->replicasId[pSender->replicaIndex]), pNode->pRaftStore->currentTerm, pNode->commitIndex, DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
...@@ -274,7 +274,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla ...@@ -274,7 +274,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...) { const char* format, ...) {
SSyncNode* pNode = pReceiver->pSyncNode; SSyncNode* pNode = pReceiver->pSyncNode;
if (pNode == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pLogStore == NULL) return;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
...@@ -311,7 +311,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df ...@@ -311,7 +311,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack,
pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start,
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, pReceiver->snapshot.lastConfigIndex, pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
......
...@@ -16,8 +16,8 @@ SyncLocalCmd *createMsg() { ...@@ -16,8 +16,8 @@ SyncLocalCmd *createMsg() {
pMsg->srcId.vgId = 100; pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100; pMsg->destId.vgId = 100;
pMsg->sdNewTerm = 123; // pMsg->sdNewTerm = 123;
pMsg->fcIndex = 456; // pMsg->fcIndex = 456;
pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
return pMsg; return pMsg;
......
...@@ -33,35 +33,35 @@ int main() { ...@@ -33,35 +33,35 @@ int main() {
initRaftId(); initRaftId();
SRaftStore* pRaftStore = raftStoreOpen("./test_raft_store.json"); // SRaftStore* pRaftStore = raftStoreOpen("./test_raft_store.json");
assert(pRaftStore != NULL); // assert(pRaftStore != NULL);
raftStoreLog2((char*)"==raftStoreOpen==", pRaftStore); // raftStoreLog2((char*)"==raftStoreOpen==", pRaftStore);
raftStoreSetTerm(pRaftStore, 100); // raftStoreSetTerm(pRaftStore, 100);
raftStoreLog2((char*)"==raftStoreSetTerm==", pRaftStore); // raftStoreLog2((char*)"==raftStoreSetTerm==", pRaftStore);
raftStoreVote(pRaftStore, &ids[0]); // raftStoreVote(pRaftStore, &ids[0]);
raftStoreLog2((char*)"==raftStoreVote==", pRaftStore); // raftStoreLog2((char*)"==raftStoreVote==", pRaftStore);
raftStoreClearVote(pRaftStore); // raftStoreClearVote(pRaftStore);
raftStoreLog2((char*)"==raftStoreClearVote==", pRaftStore); // raftStoreLog2((char*)"==raftStoreClearVote==", pRaftStore);
raftStoreVote(pRaftStore, &ids[1]); // raftStoreVote(pRaftStore, &ids[1]);
raftStoreLog2((char*)"==raftStoreVote==", pRaftStore); // raftStoreLog2((char*)"==raftStoreVote==", pRaftStore);
raftStoreNextTerm(pRaftStore); // raftStoreNextTerm(pRaftStore);
raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore); // raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreNextTerm(pRaftStore); // raftStoreNextTerm(pRaftStore);
raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore); // raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreNextTerm(pRaftStore); // raftStoreNextTerm(pRaftStore);
raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore); // raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreNextTerm(pRaftStore); // raftStoreNextTerm(pRaftStore);
raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore); // raftStoreLog2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreClose(pRaftStore); // raftStoreClose(pRaftStore);
return 0; return 0;
} }
...@@ -29,7 +29,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ ...@@ -29,7 +29,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
SSyncSnapshotReceiver* createReceiver() { SSyncSnapshotReceiver* createReceiver() {
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(*pSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(*pSyncNode));
pSyncNode->pRaftStore = (SRaftStore*)taosMemoryMalloc(sizeof(*(pSyncNode->pRaftStore))); // pSyncNode->pRaftStore = (SRaftStore*)taosMemoryMalloc(sizeof(*(pSyncNode->pRaftStore)));
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(*(pSyncNode->pFsm))); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(*(pSyncNode->pFsm)));
#if 0 #if 0
......
...@@ -29,7 +29,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ ...@@ -29,7 +29,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
SSyncSnapshotSender* createSender() { SSyncSnapshotSender* createSender() {
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(*pSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(*pSyncNode));
pSyncNode->pRaftStore = (SRaftStore*)taosMemoryMalloc(sizeof(*(pSyncNode->pRaftStore))); // pSyncNode->pRaftStore = (SRaftStore*)taosMemoryMalloc(sizeof(*(pSyncNode->pRaftStore)));
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(*(pSyncNode->pFsm))); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(*(pSyncNode->pFsm)));
#if 0 #if 0
......
...@@ -80,7 +80,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -80,7 +80,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// tla+ server vars // tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state)); cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state));
cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore)); // cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(&pSyncNode.raftStore));
// tla+ candidate vars // tla+ candidate vars
cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted)); cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
...@@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
", sby:%d, " ", sby:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d", "lcfg:%" PRId64 ", chging:%d, rsto:%d",
pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex,
logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum,
pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
......
...@@ -2858,11 +2858,11 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { ...@@ -2858,11 +2858,11 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd); cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); // cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); // snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
cJSON_AddStringToObject(pRoot, "fc-index", u64buf); // cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
......
...@@ -41,8 +41,8 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) { ...@@ -41,8 +41,8 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId); cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor); cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
int hasVoted = raftStoreHasVoted(pRaftStore); // int hasVoted = raftStoreHasVoted(pRaftStore);
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted); // cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
} }
cJSON *pJson = cJSON_CreateObject(); cJSON *pJson = cJSON_CreateObject();
......
...@@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { ...@@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
pMsgReply->destId = pMsg->srcId; pMsgReply->destId = pMsg->srcId;
pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->term = ths->raftStore.currentTerm;
SSyncLogStoreData *pData = ths->pLogStore->data; SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal; SWal *pWal = pData->pWal;
......
...@@ -1045,7 +1045,7 @@ bool taosAssertRelease(bool condition) { ...@@ -1045,7 +1045,7 @@ bool taosAssertRelease(bool condition) {
int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag
taosPrintLog(flags, level, dflag, "tAssert called in release mode, exit:%d", tsAssert); taosPrintLog(flags, level, dflag, "tAssert called in release mode, exit:%d", tsAssert);
taosPrintTrace(flags, level, dflag); taosPrintTrace(flags, level, dflag, 0);
if (tsAssert) { if (tsAssert) {
taosMsleep(300); taosMsleep(300);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册