From 736862541e86b1f4b0749b908bf9eac1b6b0b4d7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 10 Jun 2022 15:19:11 +0800 Subject: [PATCH] fix(sync): restart with config change --- source/libs/sync/inc/syncInt.h | 4 +- source/libs/sync/src/syncAppendEntries.c | 19 +++++-- source/libs/sync/src/syncAppendEntriesReply.c | 5 +- source/libs/sync/src/syncCommit.c | 12 ++++- source/libs/sync/src/syncMain.c | 50 +++++++++++-------- source/libs/sync/src/syncRaftLog.c | 4 +- source/libs/sync/src/syncSnapshot.c | 49 ++++++++++-------- 7 files changed, 88 insertions(+), 55 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 10218f69e6..83f0bd7dd8 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -201,8 +201,8 @@ void syncNodeRelease(SSyncNode* pNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); -void syncNodeBecomeFollower(SSyncNode* pSyncNode); -void syncNodeBecomeLeader(SSyncNode* pSyncNode); +void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); +void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); void syncNodeCandidate2Leader(SSyncNode* pSyncNode); void syncNodeFollower2Candidate(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 3c558b60c8..ae4ccaf2d5 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -150,7 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { "ths->state:%d, logOK:%d", pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "from candidate by append entries"); // ret or reply? return ret; @@ -380,9 +380,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // change isStandBy to normal if (!isDrop) { if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); + syncNodeBecomeLeader(ths, "config change"); } else { - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "config change"); } } @@ -469,7 +469,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sInfo("sync event log truncate, from %ld to %ld", delBegin, delEnd); + sInfo("sync event vgId:%d log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -571,7 +571,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (condition) { sTrace("recv SyncAppendEntries, candidate to follower"); - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "from candidate by append entries"); // do not reply? return ret; } @@ -742,6 +742,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (pMsg->commitIndex > ths->commitIndex) { // has commit entry in local if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + // advance commit index to sanpshot first + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex > ths->commitIndex) { + sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, ths->commitIndex, + snapshot.lastApplyIndex, syncUtilState2String(ths->state)); + ths->commitIndex = snapshot.lastApplyIndex; + } + SyncIndex beginIndex = ths->commitIndex + 1; SyncIndex endIndex = pMsg->commitIndex; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 0aa69b9252..af83b3ac94 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -184,8 +184,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries char* s = snapshotSender2Str(pSender); sInfo( - "sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu sender:%s", - host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s); + "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " + "sender:%s", + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s); taosMemoryFree(s); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index c092b31adf..96f60be51b 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -48,10 +48,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); + // advance commit index to sanpshot first + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (snapshot.lastApplyIndex > pSyncNode->commitIndex) { + sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, + pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); + pSyncNode->commitIndex = snapshot.lastApplyIndex; + } + // update commit index SyncIndex newCommitIndex = pSyncNode->commitIndex; - for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; - --index) { + for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) { bool agree = syncAgree(pSyncNode, index); sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index, pSyncNode->commitIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fcbe7063a4..f124cff786 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -411,6 +411,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; + sInfo("sync event vgId:%d sync open", pSyncInfo->vgId); + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); @@ -628,7 +630,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { raftStoreNextTerm(pSyncNode->pRaftStore); - syncNodeBecomeLeader(pSyncNode); + syncNodeBecomeLeader(pSyncNode, "one replica start"); syncNodeLog2("==state change become leader immediately==", pSyncNode); @@ -654,7 +656,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { return; } - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "first start"); // for test int32_t ret = 0; @@ -687,6 +689,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { + sInfo("sync event vgId:%d sync close", pSyncNode->vgId); + int32_t ret; assert(pSyncNode != NULL); @@ -1149,13 +1153,13 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid) void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, term); - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "update term"); raftStoreClearVote(pSyncNode->pRaftStore); } } -void syncNodeBecomeFollower(SSyncNode* pSyncNode) { - sInfo("sync event become follower"); +void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { + sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1188,8 +1192,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) { // evoterLog |-> voterLog[i]]} // /\ UNCHANGED <> // -void syncNodeBecomeLeader(SSyncNode* pSyncNode) { - sInfo("sync event become leader"); +void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { + sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -1241,7 +1245,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); assert(voteGrantedMajority(pSyncNode->pVotesGranted)); - syncNodeBecomeLeader(pSyncNode); + syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); @@ -1264,14 +1268,14 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { void syncNodeLeader2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode); } void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "candidate to follower"); syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode); } @@ -1728,17 +1732,19 @@ const char* syncStr(ESyncState state) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sInfo("sync event commit from index:%" PRId64 " to index:%" PRId64 ", %s", beginIndex, endIndex, - syncUtilState2String(state)); + sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, + endIndex, syncUtilState2String(state)); - // maybe execute by leader, skip snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (ths->pFsm->FpGetSnapshot != NULL) { - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - } - if (beginIndex <= snapshot.lastApplyIndex) { - beginIndex = snapshot.lastApplyIndex + 1; - } + /* + // maybe execute by leader, skip snapshot + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (ths->pFsm->FpGetSnapshot != NULL) { + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + } + if (beginIndex <= snapshot.lastApplyIndex) { + beginIndex = snapshot.lastApplyIndex + 1; + } + */ // execute fsm if (ths->pFsm != NULL) { @@ -1795,9 +1801,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // change isStandBy to normal if (!isDrop) { if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); + syncNodeBecomeLeader(ths, "config change"); } else { - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "config change"); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index e3ce4579fd..c53e5916ae 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -162,8 +162,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sTrace("sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pEntry->index, - syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, + sTrace("sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, + pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); return code; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index a68312d07f..af139ccf6e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -109,8 +109,10 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { char host[128]; uint16_t port; syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); - sTrace("sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", host, - port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, msgStr); + sTrace( + "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, msgStr); taosMemoryFree(msgStr); syncSnapshotSendDestroy(pMsg); @@ -230,13 +232,17 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { uint16_t port; syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { - sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", - host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - msgStr); + sTrace( + "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send " + "msg:%s", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, msgStr); } else { - sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", - host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - msgStr); + sTrace( + "sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send " + "msg:%s", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, msgStr); } taosMemoryFree(msgStr); @@ -264,8 +270,8 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { char host[128]; uint16_t port; syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); - sTrace("sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack, - msgStr); + sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, host, + port, pSender->seq, pSender->ack, msgStr); taosMemoryFree(msgStr); syncSnapshotSendDestroy(pMsg); @@ -476,8 +482,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char host[128]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sTrace("sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -495,9 +501,11 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sInfo( - "sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, " + "sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " + "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, raft log:%s", - host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr); + pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + logSimpleStr); taosMemoryFree(logSimpleStr); pReceiver->pWriter = NULL; @@ -506,8 +514,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -520,8 +528,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, - port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sTrace( + "sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); @@ -539,8 +548,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char host[128]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sTrace("sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, - port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); } else { -- GitLab