From 143a2e8552008e2a3fb21c2c0f435ae5816297ff Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 25 Nov 2022 18:56:14 +0800 Subject: [PATCH] feat: start snapshot replication to rollback in recovery mode --- source/libs/sync/src/syncAppendEntries.c | 2 +- source/libs/sync/src/syncAppendEntriesReply.c | 1 - source/libs/sync/src/syncPipeline.c | 23 +++++++++++++++---- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index d719169470..3a9e97a08f 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -188,7 +188,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - if (pMsg->prevLogIndex + 1 != pEntry->index) { + if (pMsg->prevLogIndex + 1 != pEntry->index || pEntry->term < 0) { sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64, ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index b1ebd5d8d1..524abf3c2a 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -85,7 +85,6 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); return -1; } - ASSERT(pMgr != NULL); (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); } return 0; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 1af31efd5d..f1d401759b 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -90,7 +90,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S return prevLogTerm; } - if (pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { + if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; ASSERT(timeMs != 0 && "no log entry found"); prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; @@ -277,7 +277,11 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - ret = 0; + SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1); + ASSERT(pEntry->term >= 0); + if (term == pEntry->term) { + ret = 0; + } goto _out; } @@ -655,7 +659,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; - if (pMsg->matchIndex == pMsg->lastSendIndex) { + if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->restored = true; sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", @@ -664,6 +668,17 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod return 0; } + if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { + sError("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, pNode->vgId, + host, port, pMsg->matchIndex, pMsg->lastSendIndex); + if (syncNodeStartSnapshot(pNode, &destId) < 0) { + sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); + return -1; + } + sInfo("vgId:%d, snapshot replication to rollback. peer: %s:%d", pNode->vgId, host, port); + return 0; + } + (void)syncLogReplMgrReset(pMgr); } @@ -681,7 +696,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); return -1; } - sInfo("vgId:%d, snapshot replication to peer %s:%d started", pNode->vgId, host, port); + sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port); return 0; } -- GitLab