提交 99bfc95d 编写于 作者: B Benguang Zhao

fix: synchronize access to raftStore

上级 6122a2fb
...@@ -71,6 +71,7 @@ typedef struct SRaftId { ...@@ -71,6 +71,7 @@ typedef struct SRaftId {
typedef struct SRaftStore { typedef struct SRaftStore {
SyncTerm currentTerm; SyncTerm currentTerm;
SRaftId voteFor; SRaftId voteFor;
TdThreadMutex mutex;
} SRaftStore; } SRaftStore;
typedef struct SSyncHbTimerData { typedef struct SSyncHbTimerData {
...@@ -282,7 +283,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); ...@@ -282,7 +283,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// raft vote -------------- // raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode); void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm term);
// log replication // log replication
SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId); SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);
......
...@@ -26,14 +26,15 @@ extern "C" { ...@@ -26,14 +26,15 @@ extern "C" {
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2) #define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
int32_t raftStoreReadFile(SSyncNode *pNode); int32_t raftStoreOpen(SSyncNode *pNode);
int32_t raftStoreWriteFile(SSyncNode *pNode); void raftStoreClose(SSyncNode *pNode);
bool raftStoreHasVoted(SSyncNode *pNode); bool raftStoreHasVoted(SSyncNode *pNode);
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId); void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId);
void raftStoreClearVote(SSyncNode *pNode); void raftStoreClearVote(SSyncNode *pNode);
void raftStoreNextTerm(SSyncNode *pNode); void raftStoreNextTerm(SSyncNode *pNode);
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term); void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term);
SyncTerm raftStoreGetTerm(SSyncNode *pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -120,17 +120,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -120,17 +120,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// prepare response msg // prepare response msg
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->raftStore.currentTerm; pReply->term = raftStoreGetTerm(ths);
pReply->success = false; pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->startTime = ths->startTime; pReply->startTime = ths->startTime;
if (pMsg->term < ths->raftStore.currentTerm) { if (pMsg->term < raftStoreGetTerm(ths)) {
goto _SEND_RESPONSE; goto _SEND_RESPONSE;
} }
if (pMsg->term > ths->raftStore.currentTerm) { if (pMsg->term > raftStoreGetTerm(ths)) {
pReply->term = pMsg->term; pReply->term = pMsg->term;
} }
......
...@@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
// drop stale response // drop stale response
if (pMsg->term < ths->raftStore.currentTerm) { if (pMsg->term < raftStoreGetTerm(ths)) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0; return 0;
} }
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->raftStore.currentTerm) { if (pMsg->term > raftStoreGetTerm(ths)) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
return -1; return -1;
} }
ASSERT(pMsg->term == ths->raftStore.currentTerm); ASSERT(pMsg->term == raftStoreGetTerm(ths));
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
......
...@@ -111,7 +111,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { ...@@ -111,7 +111,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
SyncIndex commitIndex = indexLikely; SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex); syncNodeUpdateCommitIndex(ths, commitIndex);
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
ths->raftStore.currentTerm, commitIndex); raftStoreGetTerm(ths), commitIndex);
} }
return ths->commitIndex; return ths->commitIndex;
} }
...@@ -51,7 +51,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { ...@@ -51,7 +51,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
SyncRequestVote* pMsg = rpcMsg.pCont; SyncRequestVote* pMsg = rpcMsg.pCont;
pMsg->srcId = pNode->myRaftId; pMsg->srcId = pNode->myRaftId;
pMsg->destId = pNode->peersId[i]; pMsg->destId = pNode->peersId[i];
pMsg->term = pNode->raftStore.currentTerm; pMsg->term = raftStoreGetTerm(pNode);
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
if (ret < 0) { if (ret < 0) {
...@@ -85,10 +85,12 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -85,10 +85,12 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
// start election // start election
raftStoreNextTerm(pSyncNode); raftStoreNextTerm(pSyncNode);
raftStoreClearVote(pSyncNode); raftStoreClearVote(pSyncNode);
voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->raftStore.currentTerm);
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->raftStore.currentTerm);
syncNodeVoteForSelf(pSyncNode); SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
voteGrantedReset(pSyncNode->pVotesGranted, currentTerm);
votesRespondReset(pSyncNode->pVotesRespond, currentTerm);
syncNodeVoteForSelf(pSyncNode, currentTerm);
if (voteGrantedMajority(pSyncNode->pVotesGranted)) { if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
// only myself, to leader // only myself, to leader
ASSERT(!pSyncNode->pVotesGranted->toLeader); ASSERT(!pSyncNode->pVotesGranted->toLeader);
......
...@@ -41,7 +41,6 @@ ...@@ -41,7 +41,6 @@
static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths); static int32_t syncNodeAppendNoop(SSyncNode* ths);
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg); static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
...@@ -468,7 +467,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { ...@@ -468,7 +467,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
} }
if (code == 0 && pEntry != NULL) { if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == raftStoreGetTerm(pSyncNode)) {
ready = true; ready = true;
} }
...@@ -664,7 +663,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ ...@@ -664,7 +663,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex); int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
if (code == 0) { if (code == 0) {
pMsg->info.conn.applyIndex = retIndex; pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm; pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
return 1; return 1;
...@@ -911,7 +910,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -911,7 +910,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init TLA+ server vars // init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
if (raftStoreReadFile(pSyncNode) != 0) { if (raftStoreOpen(pSyncNode) != 0) {
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
goto _error; goto _error;
} }
...@@ -1212,7 +1211,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1212,7 +1211,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeLogReplMgrDestroy(pSyncNode); syncNodeLogReplMgrDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL; pSyncNode->pSyncRespMgr = NULL;
voteGrantedDestroy(pSyncNode->pVotesGranted); voteGrantedDestroy(pSyncNode->pVotesGranted);
...@@ -1228,10 +1232,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1228,10 +1232,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncLogBufferDestroy(pSyncNode->pLogBuf); syncLogBufferDestroy(pSyncNode->pLogBuf);
pSyncNode->pLogBuf = NULL; pSyncNode->pLogBuf = NULL;
syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (pSyncNode->senders[i] != NULL) { if (pSyncNode->senders[i] != NULL) {
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]); sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
...@@ -1259,6 +1259,8 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1259,6 +1259,8 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pFsm); taosMemoryFree(pSyncNode->pFsm);
} }
raftStoreClose(pSyncNode);
taosMemoryFree(pSyncNode); taosMemoryFree(pSyncNode);
} }
...@@ -1633,7 +1635,7 @@ _END: ...@@ -1633,7 +1635,7 @@ _END:
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->raftStore.currentTerm) { if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term); raftStoreSetTerm(pSyncNode, term);
char tmpBuf[64]; char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
...@@ -1643,24 +1645,23 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { ...@@ -1643,24 +1645,23 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
} }
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->raftStore.currentTerm) { if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term); raftStoreSetTerm(pSyncNode, term);
} }
} }
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
if (pSyncNode->raftStore.currentTerm > newTerm) { SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, if (currentTerm > newTerm) {
pSyncNode->raftStore.currentTerm); sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
return; return;
} }
do { do {
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
pSyncNode->raftStore.currentTerm);
} while (0); } while (0);
if (pSyncNode->raftStore.currentTerm < newTerm) { if (currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode, newTerm); raftStoreSetTerm(pSyncNode, newTerm);
char tmpBuf[64]; char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
...@@ -1820,8 +1821,8 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { ...@@ -1820,8 +1821,8 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
ASSERT(lastIndex >= 0); ASSERT(lastIndex >= 0);
sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
} }
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
...@@ -1840,7 +1841,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ...@@ -1840,7 +1841,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "follower to candidate"); sNTrace(pSyncNode, "follower to candidate");
} }
...@@ -1850,7 +1851,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) { ...@@ -1850,7 +1851,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeBecomeFollower(pSyncNode, "leader to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "leader to follower"); sNTrace(pSyncNode, "leader to follower");
} }
...@@ -1860,7 +1861,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ...@@ -1860,7 +1861,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "candidate to follower"); syncNodeBecomeFollower(pSyncNode, "candidate to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "candidate to follower"); sNTrace(pSyncNode, "candidate to follower");
} }
...@@ -1868,7 +1869,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ...@@ -1868,7 +1869,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
// just called by syncNodeVoteForSelf // just called by syncNodeVoteForSelf
// need assert // need assert
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
ASSERT(term == pSyncNode->raftStore.currentTerm); ASSERT(term == raftStoreGetTerm(pSyncNode));
bool voted = raftStoreHasVoted(pSyncNode); bool voted = raftStoreHasVoted(pSyncNode);
ASSERT(!voted); ASSERT(!voted);
...@@ -1876,8 +1877,8 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) ...@@ -1876,8 +1877,8 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
} }
// simulate get vote from outside // simulate get vote from outside
void syncNodeVoteForSelf(SSyncNode* pSyncNode) { void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId); syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId); int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
...@@ -1886,7 +1887,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { ...@@ -1886,7 +1887,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
SyncRequestVoteReply* pMsg = rpcMsg.pCont; SyncRequestVoteReply* pMsg = rpcMsg.pCont;
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId;
pMsg->term = pSyncNode->raftStore.currentTerm; pMsg->term = currentTerm;
pMsg->voteGranted = true; pMsg->voteGranted = true;
voteGrantedVote(pSyncNode->pVotesGranted, pMsg); voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
...@@ -2199,7 +2200,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2199,7 +2200,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SyncHeartbeat* pSyncMsg = rpcMsg.pCont; SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId; pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->raftStore.currentTerm; pSyncMsg->term = raftStoreGetTerm(pSyncNode);
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
...@@ -2238,30 +2239,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2238,30 +2239,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
if (pNode->state == TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return -1;
}
SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
SyncTerm term = pNode->raftStore.currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
if (pEntry == NULL) return -1;
SRpcMsg rpcMsg = {0};
int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
syncEntryDestroy(pEntry);
sNTrace(pNode, "propose msg, type:noop");
code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
if (code != 0) {
sError("failed to propose noop msg while enqueue since %s", terrstr());
}
return code;
}
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); } static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) { int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
...@@ -2291,7 +2268,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { ...@@ -2291,7 +2268,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL; terrno = TSDB_CODE_SYN_BUFFER_FULL;
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL); (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; return -1;
} }
...@@ -2364,7 +2341,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) { ...@@ -2364,7 +2341,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
static int32_t syncNodeAppendNoop(SSyncNode* ths) { static int32_t syncNodeAppendNoop(SSyncNode* ths) {
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
SyncTerm term = ths->raftStore.currentTerm; SyncTerm term = raftStoreGetTerm(ths);
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
if (pEntry == NULL) { if (pEntry == NULL) {
...@@ -2380,7 +2357,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { ...@@ -2380,7 +2357,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
int32_t ret = 0; int32_t ret = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->raftStore.currentTerm; SyncTerm term = raftStoreGetTerm(ths);
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
...@@ -2418,16 +2395,17 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2418,16 +2395,17 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
SyncTerm currentTerm = raftStoreGetTerm(ths);
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
pMsgReply->destId = pMsg->srcId; pMsgReply->destId = pMsg->srcId;
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->raftStore.currentTerm; pMsgReply->term = currentTerm;
pMsgReply->privateTerm = 8864; // magic number pMsgReply->privateTerm = 8864; // magic number
pMsgReply->startTime = ths->startTime; pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs; pMsgReply->timeStamp = tsMs;
if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
...@@ -2456,7 +2434,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2456,7 +2434,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
} }
if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term); // syncNodeStepDown(ths, pMsg->term);
SRpcMsg rpcMsgLocalCmd = {0}; SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
...@@ -2565,7 +2543,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn ...@@ -2565,7 +2543,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
int32_t code = 0; int32_t code = 0;
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
SyncTerm term = ths->raftStore.currentTerm; SyncTerm term = raftStoreGetTerm(ths);
SSyncRaftEntry* pEntry = NULL; SSyncRaftEntry* pEntry = NULL;
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index); pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
...@@ -2609,73 +2587,6 @@ const char* syncStr(ESyncState state) { ...@@ -2609,73 +2587,6 @@ const char* syncStr(ESyncState state) {
} }
} }
#if 0
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
sNTrace(ths, "I am not follower, can not do leader transfer");
return 0;
}
if (!ths->restoreFinish) {
sNTrace(ths, "restore not finish, can not do leader transfer");
return 0;
}
if (pEntry->term < ths->raftStore.currentTerm) {
sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
return 0;
}
if (pEntry->index < syncNodeGetLastIndex(ths)) {
sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
return 0;
}
/*
if (ths->vgId > 1) {
sNTrace(ths, "I am vnode, can not do leader transfer");
return 0;
}
*/
SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
bool same = sameId || sameNodeInfo;
if (same) {
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
}
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = {
.code = 0,
.currentTerm = ths->raftStore.currentTerm,
.flag = 0,
.index = pEntry->index,
.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
.isWeak = pEntry->isWeak,
.seqNum = pEntry->seqNum,
.state = ths->state,
.term = pEntry->term,
};
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
}
return 0;
}
#endif
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) { for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
SRaftId raftId = { SRaftId raftId = {
......
...@@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE ...@@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE
pMsg->prevLogTerm = prevLogTerm; pMsg->prevLogTerm = prevLogTerm;
pMsg->vgId = pNode->vgId; pMsg->vgId = pNode->vgId;
pMsg->srcId = pNode->myRaftId; pMsg->srcId = pNode->myRaftId;
pMsg->term = pNode->raftStore.currentTerm; pMsg->term = raftStoreGetTerm(pNode);
pMsg->commitIndex = pNode->commitIndex; pMsg->commitIndex = pNode->commitIndex;
pMsg->privateTerm = 0; pMsg->privateTerm = 0;
return 0; return 0;
......
...@@ -61,6 +61,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -61,6 +61,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
ASSERTS(pMatch != NULL, "no matched log entry"); ASSERTS(pMatch != NULL, "no matched log entry");
ASSERT(pMatch->index + 1 == index); ASSERT(pMatch->index + 1 == index);
ASSERT(pMatch->term <= pEntry->term);
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
pBuf->entries[index % pBuf->size] = tmp; pBuf->entries[index % pBuf->size] = tmp;
...@@ -514,7 +515,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -514,7 +515,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
SSyncLogStore* pLogStore = pNode->pLogStore; SSyncLogStore* pLogStore = pNode->pLogStore;
SSyncFSM* pFsm = pNode->pFsm; SSyncFSM* pFsm = pNode->pFsm;
ESyncState role = pNode->state; ESyncState role = pNode->state;
SyncTerm term = pNode->raftStore.currentTerm; SyncTerm currentTerm = raftStoreGetTerm(pNode);
SyncGroupId vgId = pNode->vgId; SyncGroupId vgId = pNode->vgId;
int32_t ret = -1; int32_t ret = -1;
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
...@@ -529,7 +530,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -529,7 +530,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
} }
sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64, sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64,
pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term); pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
// execute in fsm // execute in fsm
for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
...@@ -545,16 +546,16 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -545,16 +546,16 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry->term, TMSG_INFO(pEntry->originalRpcType)); pEntry->term, TMSG_INFO(pEntry->originalRpcType));
} }
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) { if (syncLogFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64, ", role:%d, current term:%" PRId64,
vgId, pEntry->index, pEntry->term, role, term); vgId, pEntry->index, pEntry->term, role, currentTerm);
goto _out; goto _out;
} }
pBuf->commitIndex = index; pBuf->commitIndex = index;
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
pEntry->index, pEntry->term, role, term); pEntry->index, pEntry->term, role, currentTerm);
if (!inBuf) { if (!inBuf) {
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
...@@ -576,7 +577,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -576,7 +577,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
_out: _out:
// mark as restored if needed // mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
pNode->raftStore.currentTerm <= pEntry->term) { currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
pNode->restoreFinish = true; pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "tjson.h" #include "tjson.h"
int32_t raftStoreReadFile(SSyncNode *pNode);
int32_t raftStoreWriteFile(SSyncNode *pNode);
static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) { static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
int32_t code = 0; int32_t code = 0;
...@@ -150,27 +153,53 @@ _OVER: ...@@ -150,27 +153,53 @@ _OVER:
return code; return code;
} }
int32_t raftStoreOpen(SSyncNode *pNode) {
taosThreadMutexInit(&pNode->raftStore.mutex, NULL);
return raftStoreReadFile(pNode);
}
void raftStoreClose(SSyncNode *pNode) { taosThreadMutexDestroy(&pNode->raftStore.mutex); }
bool raftStoreHasVoted(SSyncNode *pNode) { bool raftStoreHasVoted(SSyncNode *pNode) {
taosThreadMutexLock(&pNode->raftStore.mutex);
bool b = syncUtilEmptyId(&pNode->raftStore.voteFor); bool b = syncUtilEmptyId(&pNode->raftStore.voteFor);
taosThreadMutexUnlock(&pNode->raftStore.mutex);
return (!b); return (!b);
} }
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) { void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
taosThreadMutexLock(&pNode->raftStore.mutex);
pNode->raftStore.voteFor = *pRaftId; pNode->raftStore.voteFor = *pRaftId;
(void)raftStoreWriteFile(pNode); (void)raftStoreWriteFile(pNode);
taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }
void raftStoreClearVote(SSyncNode *pNode) { void raftStoreClearVote(SSyncNode *pNode) {
taosThreadMutexLock(&pNode->raftStore.mutex);
pNode->raftStore.voteFor = EMPTY_RAFT_ID; pNode->raftStore.voteFor = EMPTY_RAFT_ID;
(void)raftStoreWriteFile(pNode); (void)raftStoreWriteFile(pNode);
taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }
void raftStoreNextTerm(SSyncNode *pNode) { void raftStoreNextTerm(SSyncNode *pNode) {
taosThreadMutexLock(&pNode->raftStore.mutex);
pNode->raftStore.currentTerm++; pNode->raftStore.currentTerm++;
(void)raftStoreWriteFile(pNode); (void)raftStoreWriteFile(pNode);
taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) { void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
pNode->raftStore.currentTerm = term; taosThreadMutexLock(&pNode->raftStore.mutex);
(void)raftStoreWriteFile(pNode); if (pNode->raftStore.currentTerm < term) {
pNode->raftStore.currentTerm = term;
(void)raftStoreWriteFile(pNode);
}
taosThreadMutexUnlock(&pNode->raftStore.mutex);
}
SyncTerm raftStoreGetTerm(SSyncNode *pNode) {
taosThreadMutexLock(&pNode->raftStore.mutex);
SyncTerm term = pNode->raftStore.currentTerm;
taosThreadMutexUnlock(&pNode->raftStore.mutex);
return term;
} }
...@@ -107,7 +107,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { ...@@ -107,7 +107,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
SyncHeartbeat* pSyncMsg = rpcMsg.pCont; SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pSyncNode->peersId[i]; pSyncMsg->destId = pSyncNode->peersId[i];
pSyncMsg->term = pSyncNode->raftStore.currentTerm; pSyncMsg->term = raftStoreGetTerm(pSyncNode);
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
......
...@@ -97,15 +97,14 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -97,15 +97,14 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
// maybe update term // maybe update term
if (pMsg->term > ths->raftStore.currentTerm) { if (pMsg->term > raftStoreGetTerm(ths)) {
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
// syncNodeUpdateTerm(ths, pMsg->term);
} }
ASSERT(pMsg->term <= ths->raftStore.currentTerm); SyncTerm currentTerm = raftStoreGetTerm(ths);
ASSERT(pMsg->term <= currentTerm);
bool grant = (pMsg->term == ths->raftStore.currentTerm) && logOK && bool grant = (pMsg->term == currentTerm) && logOK &&
((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId))); ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
if (grant) { if (grant) {
// maybe has already voted for pMsg->srcId // maybe has already voted for pMsg->srcId
...@@ -113,7 +112,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -113,7 +112,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
raftStoreVote(ths, &(pMsg->srcId)); raftStoreVote(ths, &(pMsg->srcId));
// candidate ? // candidate ?
syncNodeStepDown(ths, ths->raftStore.currentTerm); syncNodeStepDown(ths, currentTerm);
// forbid elect for this round // forbid elect for this round
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
...@@ -127,8 +126,9 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -127,8 +126,9 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pReply = rpcMsg.pCont; SyncRequestVoteReply* pReply = rpcMsg.pCont;
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->raftStore.currentTerm; pReply->term = currentTerm;
pReply->voteGranted = grant; pReply->voteGranted = grant;
ASSERT(!grant || pMsg->term == pReply->term);
// trace log // trace log
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
......
...@@ -47,27 +47,21 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -47,27 +47,21 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvRequestVoteReply(ths, pMsg, "not in my config"); syncLogRecvRequestVoteReply(ths, pMsg, "not in my config");
return -1; return -1;
} }
SyncTerm currentTerm = raftStoreGetTerm(ths);
// drop stale response // drop stale response
if (pMsg->term < ths->raftStore.currentTerm) { if (pMsg->term < currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
return -1; return -1;
} }
// ASSERT(!(pMsg->term > ths->raftStore.currentTerm)); if (pMsg->term > currentTerm) {
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->raftStore.currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncLogRecvRequestVoteReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
return -1; return -1;
} }
syncLogRecvRequestVoteReply(ths, pMsg, ""); syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->raftStore.currentTerm); ASSERT(pMsg->term == currentTerm);
// This tallies votes even when the current state is not Candidate, // This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter. // but they won't be looked at, so it doesn't matter.
......
...@@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { ...@@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
.state = pNode->state, .state = pNode->state,
.seqNum = *pSeqNum, .seqNum = *pSeqNum,
.term = SYNC_TERM_INVALID, .term = SYNC_TERM_INVALID,
.currentTerm = pNode->raftStore.currentTerm, .currentTerm = SYNC_TERM_INVALID,
.flag = 0, .flag = 0,
}; };
......
...@@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->pSyncNode = pSyncNode; pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex; pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->raftStore.currentTerm; pSender->term = raftStoreGetTerm(pSyncNode);
pSender->startTime = 0; pSender->startTime = 0;
pSender->endTime = 0; pSender->endTime = 0;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
...@@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0; pSender->sendingMS = 0;
pSender->term = pSender->pSyncNode->raftStore.currentTerm; pSender->term = raftStoreGetTerm(pSender->pSyncNode);
pSender->startTime = taosGetTimestampMs(); pSender->startTime = taosGetTimestampMs();
pSender->lastSendTime = pSender->startTime; pSender->lastSendTime = pSender->startTime;
pSender->finish = false; pSender->finish = false;
...@@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->raftStore.currentTerm; pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->raftStore.currentTerm; pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->raftStore.currentTerm; pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from ...@@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
pReceiver->pSyncNode = pSyncNode; pReceiver->pSyncNode = pSyncNode;
pReceiver->fromId = fromId; pReceiver->fromId = fromId;
pReceiver->term = pSyncNode->raftStore.currentTerm; pReceiver->term = raftStoreGetTerm(pSyncNode);
pReceiver->snapshot.data = NULL; pReceiver->snapshot.data = NULL;
pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastApplyTerm = 0;
...@@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p ...@@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
pReceiver->start = true; pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm; pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode);
pReceiver->fromId = pPreMsg->srcId; pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
...@@ -437,9 +437,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap ...@@ -437,9 +437,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
// maybe update term // maybe update term
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) { if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm; raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
(void)raftStoreWriteFile(pReceiver->pSyncNode);
} }
// stop writer, apply data // stop writer, apply data
...@@ -584,7 +583,7 @@ _SEND_REPLY: ...@@ -584,7 +583,7 @@ _SEND_REPLY:
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -640,7 +639,7 @@ _SEND_REPLY: ...@@ -640,7 +639,7 @@ _SEND_REPLY:
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -690,7 +689,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend ...@@ -690,7 +689,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -737,7 +736,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs ...@@ -737,7 +736,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->raftStore.currentTerm; pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime; pRspMsg->startTime = pReceiver->startTime;
...@@ -786,13 +785,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -786,13 +785,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return -1; return -1;
} }
if (pMsg->term < pSyncNode->raftStore.currentTerm) { if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
if (pMsg->term > pSyncNode->raftStore.currentTerm) { if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
syncNodeStepDown(pSyncNode, pMsg->term); syncNodeStepDown(pSyncNode, pMsg->term);
} }
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
...@@ -800,7 +799,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -800,7 +799,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// state, term, seq/ack // state, term, seq/ack
int32_t code = 0; int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->raftStore.currentTerm) { if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
...@@ -884,7 +883,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend ...@@ -884,7 +883,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
SyncSnapshotSend *pSendMsg = rpcMsg.pCont; SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm; pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pSendMsg->beginIndex = pSender->snapshotParam.start; pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
...@@ -943,10 +942,11 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -943,10 +942,11 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
goto _ERROR; goto _ERROR;
} }
if (pMsg->term != pSyncNode->raftStore.currentTerm) { SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
if (pMsg->term != currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
pSyncNode->raftStore.currentTerm); currentTerm);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR; goto _ERROR;
} }
......
...@@ -154,7 +154,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { ...@@ -154,7 +154,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->raftStore.currentTerm; int64_t currentTerm = raftStoreGetTerm(pNode);
// save error code, otherwise it will be overwritten // save error code, otherwise it will be overwritten
int32_t errCode = terrno; int32_t errCode = terrno;
...@@ -260,7 +260,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla ...@@ -260,7 +260,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex,
pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
...@@ -308,7 +308,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df ...@@ -308,7 +308,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term,
pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
......
...@@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
", sby:%d, " ", sby:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d", "lcfg:%" PRId64 ", chging:%d, rsto:%d",
pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, pSyncNode->vgId, syncStr(pSyncNode->state), raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex,
logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum,
pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
......
...@@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { ...@@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
pMsgReply->destId = pMsg->srcId; pMsgReply->destId = pMsg->srcId;
pMsgReply->term = ths->raftStore.currentTerm; pMsgReply->term = raftStoreGetTerm(ths);
SSyncLogStoreData *pData = ths->pLogStore->data; SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal; SWal *pWal = pData->pWal;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册