提交 e23c969d 编写于 作者: M Minghao Li

refactor(sync): make leader life longer

上级 e99782f4
...@@ -1562,7 +1562,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { ...@@ -1562,7 +1562,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized; return serialized;
} }
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str); int32_t userStrLen = strlen(str);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
...@@ -1634,7 +1634,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1634,7 +1634,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
taosMemoryFree(pCfgStr); taosMemoryFree(pCfgStr);
} }
void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str); int32_t userStrLen = strlen(str);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
...@@ -1701,7 +1701,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1701,7 +1701,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
taosMemoryFree(pCfgStr); taosMemoryFree(pCfgStr);
} }
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256; int len = 256;
char* s = (char*)taosMemoryMalloc(len); char* s = (char*)taosMemoryMalloc(len);
...@@ -1724,7 +1724,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -1724,7 +1724,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s; return s;
} }
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
bool b1 = false; bool b1 = false;
bool b2 = false; bool b2 = false;
...@@ -2942,7 +2942,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { ...@@ -2942,7 +2942,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true; return true;
} }
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { inline void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -2953,7 +2953,7 @@ void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c ...@@ -2953,7 +2953,7 @@ void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { inline void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256]; char logBuf[256];
char host[64]; char host[64];
uint16_t port; uint16_t port;
...@@ -2964,7 +2964,7 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c ...@@ -2964,7 +2964,7 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { inline void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -2974,7 +2974,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl ...@@ -2974,7 +2974,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { inline void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -2984,7 +2984,7 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl ...@@ -2984,7 +2984,7 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { inline void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -2999,7 +2999,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -2999,7 +2999,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { inline void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -3014,7 +3014,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -3014,7 +3014,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { inline void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -3027,7 +3027,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -3027,7 +3027,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { inline void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
...@@ -3040,7 +3040,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -3040,7 +3040,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { inline void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
...@@ -3052,7 +3052,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -3052,7 +3052,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { inline void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
......
...@@ -51,15 +51,21 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -51,15 +51,21 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return -1; return -1;
} }
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
// log not ok, do not update term, ignore it
if (pMsg->term > ths->pRaftStore->currentTerm && !logOK) {
return -1;
}
// maybe update term // maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term); syncNodeUpdateTerm(ths, pMsg->term);
} }
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) { if (grant) {
...@@ -94,48 +100,6 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -94,48 +100,6 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return ret; return ret;
} }
#if 0
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%" PRIu64, ths->pRaftStore->currentTerm);
syncRequestVoteLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
// maybe has already voted for pMsg->srcId
// vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
// forbid elect for this round
syncNodeResetElectTimer(ths);
}
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->voteGranted = grant;
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return ret;
}
#endif
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
......
...@@ -93,65 +93,6 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -93,65 +93,6 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return 0; return 0;
} }
#if 0
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
}
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, pMsg->term,
ths->pRaftStore->currentTerm);
syncNodePrint2(logBuf, ths);
sError("%s", logBuf);
return ret;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
if (ths->state == TAOS_SYNC_STATE_CANDIDATE) {
votesRespondAdd(ths->pVotesRespond, pMsg);
if (pMsg->voteGranted) {
// add vote
voteGrantedVote(ths->pVotesGranted, pMsg);
// maybe to leader
if (voteGrantedMajority(ths->pVotesGranted)) {
if (!ths->pVotesGranted->toLeader) {
syncNodeCandidate2Leader(ths);
// prevent to leader again!
ths->pVotesGranted->toLeader = true;
}
}
} else {
;
// do nothing
// UNCHANGED <<votesGranted, voterLog>>
}
}
return ret;
}
#endif
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册