From 2e640e8e68c2aae790dbe69f50237b7dffc1955e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 24 Nov 2022 10:25:06 +0800 Subject: [PATCH] feat: start snapshot in recovery mode of sync repl mgr with lastMatchTerm in reply msg --- source/libs/sync/inc/syncMessage.h | 2 +- source/libs/sync/inc/syncPipeline.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 2 +- source/libs/sync/src/syncMain.c | 2 +- source/libs/sync/src/syncPipeline.c | 60 +++++++++++++++--------- source/libs/sync/src/syncUtil.c | 4 +- 6 files changed, 45 insertions(+), 27 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index c5fdc27426..2af2e7b5cd 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -105,7 +105,7 @@ typedef struct SyncAppendEntriesReply { SRaftId destId; // private data SyncTerm term; - SyncTerm privateTerm; + SyncTerm lastMatchTerm; bool success; SyncIndex matchIndex; SyncIndex lastSendIndex; diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 9a68b1e0c8..ca07876def 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -103,7 +103,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 9634f4ee26..d719169470 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -206,7 +206,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { accepted = true; _SEND_RESPONSE: - pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); + pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm); bool matched = (pReply->matchIndex >= pReply->lastSendIndex); if (accepted && matched) { pReply->success = true; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a4c0022d98..108b9ab5bd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2170,7 +2170,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { } // proceed match index, with replicating on needed - SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); + SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL); sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 71ac5d1464..a7a983a06e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -76,17 +76,17 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S if (prevIndex == -1) return 0; - if (index - 1 > pBuf->matchIndex) { + if (prevIndex > pBuf->matchIndex) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } ASSERT(index - 1 == prevIndex); - if (index - 1 >= pBuf->startIndex) { - pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + if (prevIndex >= pBuf->startIndex) { + pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; ASSERT(pEntry != NULL && "no log entry found"); - prevLogTerm = pBuf->entries[(index + pBuf->size) % pBuf->size].prevLogTerm; + prevLogTerm = pEntry->term; return prevLogTerm; } @@ -354,7 +354,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { return 0; } -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); @@ -419,6 +419,9 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { _out: pBuf->matchIndex = matchIndex; + if (pMatchTerm) { + *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term; + } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return matchIndex; @@ -615,16 +618,16 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ASSERT(pMgr->restored == false); char host[64]; uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + syncUtilU642Addr(destId.addr, host, sizeof(host), &port); if (pMgr->endIndex == 0) { ASSERT(pMgr->startIndex == 0); ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -638,9 +641,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod if (pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -648,23 +651,38 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod (void)syncLogReplMgrReset(pMgr); } - // check existence of WAl log + // check last match term + SyncTerm term = -1; SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); - if (pMsg->matchIndex + 1 < firstVer) { - if (syncNodeStartSnapshot(pNode, &destId) < 0) { - sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr); + SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); + + if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { + term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1); + + if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { + ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); + if (syncNodeStartSnapshot(pNode, &destId) < 0) { + sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); + } + return 0; + } + + ASSERT(index + 1 >= firstVer); + + if (term == pMsg->lastMatchTerm) { + index = index + 1; + ASSERT(index <= pNode->pLogBuf->matchIndex); + } else { + ASSERT(index > firstVer); } - return 0; } - // send match index - SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); - bool barrier = false; - SyncTerm term = -1; + // attempt to replicate the raft log at index + bool barrier = false; ASSERT(index >= 0); if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, destId.addr); + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", peer %s:%d", pNode->vgId, + terrstr(), index, host, port); return -1; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 97de188253..181ddffd5b 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -373,7 +373,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries sNTrace(pSyncNode, "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); + host, port, pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { @@ -384,7 +384,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries sNTrace(pSyncNode, "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); + host, port, pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { -- GitLab