diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 199ef289f31948b0d21de1f0adf0304a648c9f16..3dbca1210d30d3fe2f2756adfa6a4f379d341bf8 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -610,6 +610,15 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod (void)syncLogReplMgrReset(pMgr); } + // check existence of WAl log + SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); + if (pMsg->matchIndex < firstVer) { + if (syncNodeStartSnapshot(pNode, &destId) < 0) { + sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr); + } + return 0; + } + // send match index SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); bool barrier = false; @@ -901,6 +910,13 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); if (pEntry == NULL) { sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); + if (pMgr) { + sInfo("vgId:%d, reset log repl mgr for dest: 0x%016" PRIx64, pNode->vgId, pDestId->addr); + (void)syncLogReplMgrReset(pMgr); + } + } goto _err; } *pBarrier = syncLogIsReplicationBarrier(pEntry);