提交 e350f713 编写于 作者: B Benguang Zhao

enh: reset elect timer at the end of callbacks

上级 40d99e45
...@@ -105,6 +105,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -105,6 +105,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
bool accepted = false; bool accepted = false;
SSyncRaftEntry* pEntry = NULL; SSyncRaftEntry* pEntry = NULL;
bool resetElect = false;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
...@@ -137,7 +138,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -137,7 +138,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
syncNodeResetElectTimer(ths); resetElect = true;
if (pMsg->dataLen < sizeof(SSyncRaftEntry)) { if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d", sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
...@@ -184,10 +185,9 @@ _SEND_RESPONSE: ...@@ -184,10 +185,9 @@ _SEND_RESPONSE:
// commit index, i.e. leader notice me // commit index, i.e. leader notice me
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr()); sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
goto _out;
} }
_out: if (resetElect) syncNodeResetElectTimer(ths);
return 0; return 0;
_IGNORE: _IGNORE:
......
...@@ -115,6 +115,5 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -115,6 +115,5 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
ASSERT(ret == 0); ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
return ret; return ret;
} }
...@@ -1593,8 +1593,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1593,8 +1593,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer // trace log
syncNodeResetElectTimer(pSyncNode); sNTrace(pSyncNode, "become follower %s", debugStr);
// send rsp to client // send rsp to client
syncNodeLeaderChangeRsp(pSyncNode); syncNodeLeaderChangeRsp(pSyncNode);
...@@ -1610,8 +1610,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1610,8 +1610,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// reset log buffer // reset log buffer
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
// trace log // reset elect timer
sNTrace(pSyncNode, "become follower %s", debugStr); syncNodeResetElectTimer(pSyncNode);
} }
// TLA+ Spec // TLA+ Spec
...@@ -2277,6 +2277,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { ...@@ -2277,6 +2277,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeat* pMsg = pRpcMsg->pCont; SyncHeartbeat* pMsg = pRpcMsg->pCont;
bool resetElect = false;
const STraceId* trace = &pRpcMsg->info.traceId; const STraceId* trace = &pRpcMsg->info.traceId;
char tbuf[40] = {0}; char tbuf[40] = {0};
...@@ -2300,12 +2301,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2300,12 +2301,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
resetElect = true;
syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex; ths->minMatchIndex = pMsg->minMatchIndex;
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
SRpcMsg rpcMsgLocalCmd = {0}; SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
...@@ -2328,7 +2328,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2328,7 +2328,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term);
SRpcMsg rpcMsgLocalCmd = {0}; SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
...@@ -2348,15 +2347,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2348,15 +2347,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
} }
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
// reply // reply
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
if (resetElect) syncNodeResetElectTimer(ths);
return 0; return 0;
} }
......
...@@ -89,6 +89,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -89,6 +89,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont; SyncRequestVote* pMsg = pRpcMsg->pCont;
bool resetElect = false;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) { if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
...@@ -115,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -115,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, currentTerm); syncNodeStepDown(ths, currentTerm);
// forbid elect for this round // forbid elect for this round
syncNodeResetElectTimer(ths); resetElect = true;
} }
// send msg // send msg
...@@ -134,5 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -134,5 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
syncLogSendRequestVoteReply(ths, pReply, ""); syncLogSendRequestVoteReply(ths, pReply, "");
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
if (resetElect) syncNodeResetElectTimer(ths);
return 0; return 0;
} }
...@@ -798,7 +798,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -798,7 +798,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->term > raftStoreGetTerm(pSyncNode)) { if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
syncNodeStepDown(pSyncNode, pMsg->term); syncNodeStepDown(pSyncNode, pMsg->term);
} }
syncNodeResetElectTimer(pSyncNode);
// state, term, seq/ack // state, term, seq/ack
int32_t code = 0; int32_t code = 0;
...@@ -840,6 +839,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -840,6 +839,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
code = -1; code = -1;
} }
syncNodeResetElectTimer(pSyncNode);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册