From b80095dc870547ff30cecc16e471e88ec173c338 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 22 Nov 2022 22:24:20 +0800 Subject: [PATCH] enh: snapshot during recovery of SSynclogReplMgr --- source/libs/sync/src/syncPipeline.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 199ef289f3..3dbca1210d 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -610,6 +610,15 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod (void)syncLogReplMgrReset(pMgr); } + // check existence of WAl log + SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); + if (pMsg->matchIndex < firstVer) { + if (syncNodeStartSnapshot(pNode, &destId) < 0) { + sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr); + } + return 0; + } + // send match index SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); bool barrier = false; @@ -901,6 +910,13 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); if (pEntry == NULL) { sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); + if (pMgr) { + sInfo("vgId:%d, reset log repl mgr for dest: 0x%016" PRIx64, pNode->vgId, pDestId->addr); + (void)syncLogReplMgrReset(pMgr); + } + } goto _err; } *pBarrier = syncLogIsReplicationBarrier(pEntry); -- GitLab