未验证 提交 e6e07958 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #15789 from taosdata/feature/3.0_mhli

refactor(sync): make leader life longer
......@@ -212,6 +212,7 @@ void syncNodeRelease(SSyncNode* pNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
......
......@@ -717,24 +717,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// maybe update commit index, leader notice me
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64,
commitBegin, commitEnd);
syncNodeEventLog(ths, eventLog);
}
SyncIndex beginIndex = 0;
SyncIndex endIndex = -1;
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// has commit entry in local
if (pMsg->commitIndex <= lastIndex) {
beginIndex = ths->commitIndex + 1;
endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
......@@ -743,10 +734,22 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
} else if (pMsg->commitIndex > lastIndex && ths->commitIndex < lastIndex) {
beginIndex = ths->commitIndex + 1;
endIndex = lastIndex;
// update commit index, speed up
ths->commitIndex = lastIndex;
// call back Wal
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
}
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
return 0;
}
} while (0);
......
......@@ -73,7 +73,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
ASSERT(pEntry != NULL);
// cannot commit, even if quorum agree. need check term!
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
// update commit index
newCommitIndex = index;
......
......@@ -1562,7 +1562,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized;
}
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
......@@ -1634,7 +1634,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
taosMemoryFree(pCfgStr);
}
void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
......@@ -1701,7 +1701,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
taosMemoryFree(pCfgStr);
}
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256;
char* s = (char*)taosMemoryMalloc(len);
......@@ -1724,7 +1724,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s;
}
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
bool b1 = false;
bool b2 = false;
......@@ -1987,6 +1987,12 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
}
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
}
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -2614,7 +2620,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
walFsync(pWal, false);
if (ths->replicaNum > 1) {
// if multi replica, start replicate right now
......@@ -2797,11 +2803,28 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
if (beginIndex > endIndex) {
return 0;
}
// advance commit index to sanpshot first
SSnapshot snapshot = {0};
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
snapshot.lastApplyIndex);
syncNodeEventLog(ths, eventLog);
// update begin index
beginIndex = snapshot.lastApplyIndex + 1;
}
int32_t code = 0;
ESyncState state = flag;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
syncNodeEventLog(ths, eventLog);
// execute fsm
......@@ -2942,7 +2965,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true;
}
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
inline void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
......@@ -2953,7 +2976,7 @@ void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
inline void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
......@@ -2964,7 +2987,7 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
inline void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
......@@ -2974,7 +2997,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
inline void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
......@@ -2984,7 +3007,7 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
inline void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
......@@ -2999,7 +3022,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
inline void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
......@@ -3014,7 +3037,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
inline void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
......@@ -3027,7 +3050,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
inline void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
......@@ -3040,7 +3063,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
inline void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
......@@ -3052,7 +3075,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
inline void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
......
......@@ -237,51 +237,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return 0;
}
#if 0
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
if (pEntry->index != writeIndex) {
sError("vgId:%d, wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId,
pEntry->index, writeIndex);
pEntry->index = writeIndex;
}
int code = 0;
SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
// walFsync(pWal, true);
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
} while (0);
return code;
}
#endif
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
......@@ -400,45 +355,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
//-------------------------------
// log[0 .. n]
#if 0
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
ASSERT(pEntry->index == lastIndex + 1);
int code = 0;
SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
// walFsync(pWal, true);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "old write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
return code;
}
#endif
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......
......@@ -140,9 +140,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
", match-index:%d, raftid:%" PRId64,
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
// syncNodeRestartNowHeartbeatTimer(pSyncNode);
syncNodeStartNowHeartbeatTimer(pSyncNode);
return -1;
}
......
......@@ -51,15 +51,20 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return -1;
}
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
......@@ -94,48 +99,6 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return ret;
}
#if 0
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%" PRIu64, ths->pRaftStore->currentTerm);
syncRequestVoteLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
// maybe has already voted for pMsg->srcId
// vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
// forbid elect for this round
syncNodeResetElectTimer(ths);
}
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->voteGranted = grant;
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return ret;
}
#endif
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
......@@ -200,13 +163,18 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return -1;
}
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
......
......@@ -93,65 +93,6 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return 0;
}
#if 0
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
}
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, pMsg->term,
ths->pRaftStore->currentTerm);
syncNodePrint2(logBuf, ths);
sError("%s", logBuf);
return ret;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
if (ths->state == TAOS_SYNC_STATE_CANDIDATE) {
votesRespondAdd(ths->pVotesRespond, pMsg);
if (pMsg->voteGranted) {
// add vote
voteGrantedVote(ths->pVotesGranted, pMsg);
// maybe to leader
if (voteGrantedMajority(ths->pVotesGranted)) {
if (!ths->pVotesGranted->toLeader) {
syncNodeCandidate2Leader(ths);
// prevent to leader again!
ths->pVotesGranted->toLeader = true;
}
}
} else {
;
// do nothing
// UNCHANGED <<votesGranted, voterLog>>
}
}
return ret;
}
#endif
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
......@@ -184,6 +125,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
if (ths->state == TAOS_SYNC_STATE_CANDIDATE) {
if (ths->pVotesRespond->term != pMsg->term) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "vote respond error vote-respond-mgr term:%lu, msg term:lu",
ths->pVotesRespond->term, pMsg->term);
syncNodeErrorLog(ths, logBuf);
return -1;
}
votesRespondAdd(ths->pVotesRespond, pMsg);
if (pMsg->voteGranted) {
// add vote
......
......@@ -81,4 +81,16 @@ for file in `ls ${logpath}/log.dnode*vgId*`;do
done
echo ""
echo "generate log.commit ..."
tmpfile=${logpath}/log.commits.tmp
touch ${tmpfile}
for file in `ls ${logpath}/log.dnode*.vgId*.commit`;do
line=`cat ${file} | tail -n1`
echo $line | awk '{print $5, $0}' >> ${tmpfile}
done
cat ${tmpfile} | sort -k1 > ${logpath}/log.commits
exit 0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册