diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index bc3275a9714f5db4f7866b18153953a1881bf8cb..82399f52b93b7d87e5adfdda116c1d60e6535239 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -212,6 +212,7 @@ void syncNodeRelease(SSyncNode* pNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); +void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index a7bc4df281719f7ee50d672518829d18a9abf65c..f31f3dd1aea037b36846cf0fda55a564220e12fb 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -717,24 +717,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // maybe update commit index, leader notice me if (pMsg->commitIndex > ths->commitIndex) { - // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { - // advance commit index to sanpshot first - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { - SyncIndex commitBegin = ths->commitIndex; - SyncIndex commitEnd = snapshot.lastApplyIndex; - ths->commitIndex = snapshot.lastApplyIndex; + SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, - commitBegin, commitEnd); - syncNodeEventLog(ths, eventLog); - } + SyncIndex beginIndex = 0; + SyncIndex endIndex = -1; - SyncIndex beginIndex = ths->commitIndex + 1; - SyncIndex endIndex = pMsg->commitIndex; + // has commit entry in local + if (pMsg->commitIndex <= lastIndex) { + beginIndex = ths->commitIndex + 1; + endIndex = pMsg->commitIndex; // update commit index ths->commitIndex = pMsg->commitIndex; @@ -743,10 +734,22 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + } else if (pMsg->commitIndex > lastIndex && ths->commitIndex < lastIndex) { + beginIndex = ths->commitIndex + 1; + endIndex = lastIndex; + + // update commit index, speed up + ths->commitIndex = lastIndex; + + // call back Wal + code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); } + + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); } + return 0; } } while (0); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index c18c2cc0d596082b256a2a0ee5a670d347068cf6..a603cfff2762bbd4fea088c9a4ad120d0471fce0 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -73,7 +73,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ASSERT(pEntry != NULL); // cannot commit, even if quorum agree. need check term! - if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { + if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { // update commit index newCommitIndex = index; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 37d0dff095c0b9be8138ef2b9700190263ba88fd..48ebf824e8b5f7ca71874b53c6f8c2e9f61decfc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1562,7 +1562,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } -void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { +inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -1634,7 +1634,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { taosMemoryFree(pCfgStr); } -void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { +inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -1701,7 +1701,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { taosMemoryFree(pCfgStr); } -char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { +inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); @@ -1724,7 +1724,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { +inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { bool b1 = false; bool b2 = false; @@ -1987,6 +1987,12 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } } +void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { + if (term > pSyncNode->pRaftStore->currentTerm) { + raftStoreSetTerm(pSyncNode->pRaftStore, term); + } +} + void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -2614,7 +2620,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p // fsync once SSyncLogStoreData* pData = ths->pLogStore->data; SWal* pWal = pData->pWal; - walFsync(pWal, true); + walFsync(pWal, false); if (ths->replicaNum > 1) { // if multi replica, start replicate right now @@ -2797,11 +2803,28 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { } int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { + if (beginIndex > endIndex) { + return 0; + } + + // advance commit index to sanpshot first + SSnapshot snapshot = {0}; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, + snapshot.lastApplyIndex); + syncNodeEventLog(ths, eventLog); + + // update begin index + beginIndex = snapshot.lastApplyIndex + 1; + } + int32_t code = 0; ESyncState state = flag; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex); + snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex); syncNodeEventLog(ths, eventLog); // execute fsm @@ -2942,7 +2965,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } -void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { +inline void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -2953,7 +2976,7 @@ void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { +inline void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { char logBuf[256]; char host[64]; uint16_t port; @@ -2964,7 +2987,7 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { +inline void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -2974,7 +2997,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { +inline void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -2984,7 +3007,7 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { +inline void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -2999,7 +3022,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { +inline void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -3014,7 +3037,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { +inline void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -3027,7 +3050,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { +inline void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -3040,7 +3063,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { +inline void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -3052,7 +3075,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { +inline void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index bf440f04a06f7c26dfcae36a110a123c39c5bf49..b575e40d86884c9fdd688db03e4cc8492e9ea0d3 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -237,51 +237,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr return 0; } -#if 0 -static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - - SyncIndex writeIndex = raftLogWriteIndex(pLogStore); - if (pEntry->index != writeIndex) { - sError("vgId:%d, wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId, - pEntry->index, writeIndex); - pEntry->index = writeIndex; - } - - int code = 0; - SWalSyncInfo syncMeta; - syncMeta.isWeek = pEntry->isWeak; - syncMeta.seqNum = pEntry->seqNum; - syncMeta.term = pEntry->term; - code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pEntry->index, err, err, errStr, sysErr, sysErrStr); - syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); - } - - // walFsync(pWal, true); - - do { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); - syncNodeEventLog(pData->pSyncNode, eventLog); - } while (0); - - return code; -} -#endif - // entry found, return 0 // entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST // other error, return -1 @@ -400,45 +355,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp //------------------------------- // log[0 .. n] -#if 0 -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - - SyncIndex lastIndex = logStoreLastIndex(pLogStore); - ASSERT(pEntry->index == lastIndex + 1); - - int code = 0; - SWalSyncInfo syncMeta; - syncMeta.isWeek = pEntry->isWeak; - syncMeta.seqNum = pEntry->seqNum; - syncMeta.term = pEntry->term; - code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pEntry->index, err, err, errStr, sysErr, sysErrStr); - syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); - } - - // walFsync(pWal, true); - - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "old write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); - syncNodeEventLog(pData->pSyncNode, eventLog); - - return code; -} -#endif - int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4f2dbcae6702b802bc93109fdc233785a241dc97..bc703e519cf9a0afbbb2788023a3cc8ea2338af1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -140,9 +140,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 ", match-index:%d, raftid:%" PRId64, pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); - - // syncNodeRestartNowHeartbeatTimer(pSyncNode); - syncNodeStartNowHeartbeatTimer(pSyncNode); return -1; } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index bad32c5f911a5e0bf70aee4cbcda568590d5c36f..b47294a8e1c045dfe884528498fbb8154efb8df5 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -51,15 +51,20 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { return -1; } + bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || + ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && + (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); + if (logOK) { + syncNodeUpdateTerm(ths, pMsg->term); + } else { + syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); + } } ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || - ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && - (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); if (grant) { @@ -94,48 +99,6 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { return ret; } -#if 0 -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%" PRIu64, ths->pRaftStore->currentTerm); - syncRequestVoteLog2(logBuf, pMsg); - - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || - ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && - (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); - bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && - ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); - if (grant) { - // maybe has already voted for pMsg->srcId - // vote again, no harm - raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); - - // forbid elect for this round - syncNodeResetElectTimer(ths); - } - - SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->voteGranted = grant; - - SRpcMsg rpcMsg; - syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncRequestVoteReplyDestroy(pReply); - - return ret; -} -#endif - static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); @@ -200,13 +163,18 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { return -1; } + bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); + if (logOK) { + syncNodeUpdateTerm(ths, pMsg->term); + } else { + syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); + } } ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); if (grant) { diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 566b80881f9786426f5aa62e6c44504f92db174e..55553d50485358a79adfadc625dd73ca0c05c251 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -93,65 +93,6 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) return 0; } -#if 0 -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm); - syncRequestVoteReplyLog2(logBuf, pMsg); - - if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, - ths->pRaftStore->currentTerm); - return ret; - } - - // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, pMsg->term, - ths->pRaftStore->currentTerm); - syncNodePrint2(logBuf, ths); - sError("%s", logBuf); - return ret; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // This tallies votes even when the current state is not Candidate, - // but they won't be looked at, so it doesn't matter. - if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { - votesRespondAdd(ths->pVotesRespond, pMsg); - if (pMsg->voteGranted) { - // add vote - voteGrantedVote(ths->pVotesGranted, pMsg); - - // maybe to leader - if (voteGrantedMajority(ths->pVotesGranted)) { - if (!ths->pVotesGranted->toLeader) { - syncNodeCandidate2Leader(ths); - - // prevent to leader again! - ths->pVotesGranted->toLeader = true; - } - } - } else { - ; - // do nothing - // UNCHANGED <> - } - } - - return ret; -} -#endif - int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; @@ -184,6 +125,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { + if (ths->pVotesRespond->term != pMsg->term) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "vote respond error vote-respond-mgr term:%lu, msg term:lu", + ths->pVotesRespond->term, pMsg->term); + syncNodeErrorLog(ths, logBuf); + return -1; + } + votesRespondAdd(ths->pVotesRespond, pMsg); if (pMsg->voteGranted) { // add vote diff --git a/source/libs/sync/test/sh/a.sh b/source/libs/sync/test/sh/a.sh index 751b42b9c22077d21cbc694392f1b0bab3a0f7d7..3983d30b7cf658f4b155d5b90ab8d395886d46bb 100644 --- a/source/libs/sync/test/sh/a.sh +++ b/source/libs/sync/test/sh/a.sh @@ -81,4 +81,16 @@ for file in `ls ${logpath}/log.dnode*vgId*`;do done +echo "" +echo "generate log.commit ..." +tmpfile=${logpath}/log.commits.tmp +touch ${tmpfile} +for file in `ls ${logpath}/log.dnode*.vgId*.commit`;do + line=`cat ${file} | tail -n1` + echo $line | awk '{print $5, $0}' >> ${tmpfile} +done +cat ${tmpfile} | sort -k1 > ${logpath}/log.commits + exit 0 + +