From 90e7d794f3da673c3fc0c0b37c6cd82a1b07516e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 13:32:38 +0800 Subject: [PATCH] fix(sync): reset commit index by snapshot when open sync --- source/libs/sync/src/syncMain.c | 13 ++++++++++++- source/libs/sync/src/syncRequestVoteReply.c | 20 ++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9cd1d62361..d55a385bb8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -999,7 +999,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); ASSERT(pSyncNode->pLogStore != NULL); - pSyncNode->commitIndex = SYNC_INDEX_INVALID; + + SyncIndex commitIndex = SYNC_INDEX_INVALID; + if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + SSnapshot snapshot = {0}; + int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + ASSERT(code == 0); + if (snapshot.lastApplyIndex > commitIndex) { + commitIndex = snapshot.lastApplyIndex; + syncNodeEventLog(pSyncNode, "reset commit index by snapshot"); + } + } + pSyncNode->commitIndex = commitIndex; // timer ms init pSyncNode->pingBaseLine = PING_TIMER_MS; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 299b5986df..9ae70ca8da 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -46,8 +46,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port, - pMsg->term, pMsg->voteGranted); + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, + port, pMsg->term, pMsg->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -58,7 +58,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -71,7 +71,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -88,7 +88,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -191,8 +191,8 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port, - pMsg->term, pMsg->voteGranted); + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, + port, pMsg->term, pMsg->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -203,7 +203,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -216,7 +216,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -233,7 +233,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; -- GitLab