diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 6a6838e388e8912ccf1c615bcf2f36a83b928e57..24f1196bed8b8a97b62025c73ca7abbb613e8325 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -550,7 +550,7 @@ typedef struct SyncPreSnapshotReply { // private data SyncTerm term; - SyncIndex matchIndex; + SyncIndex snapStart; } SyncPreSnapshotReply; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8f5f0eef89a9f65807d6dac71f0593aa6289a0e4..b70d6d5f095d1cf2392e8ef9704d9fae693f118b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3702,8 +3702,9 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->matchIndex, s); + snprintf(logBuf, sizeof(logBuf), + "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->snapStart, s); syncNodeEventLog(pSyncNode, logBuf); } @@ -3712,7 +3713,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->matchIndex, s); + snprintf(logBuf, sizeof(logBuf), + "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->snapStart, s); syncNodeEventLog(pSyncNode, logBuf); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index b50be7e6dc0f6ea48a12b84a7c2edbe839a3d7e6..d070d3e7443c3edd6683e6164d7d09f15e50075e 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2568,8 +2568,8 @@ cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); - cJSON_AddStringToObject(pRoot, "match-index", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart); + cJSON_AddStringToObject(pRoot, "snap-start", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ef18b29983cacf982e7cbf8924dc6ba5a3d9ac7e..6706e2f2134a635a676a8d6b55c099ccea3b725f 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -925,10 +925,60 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { syncLogRecvSyncPreSnapshot(ths, pMsg, ""); + + SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); + pMsgReply->srcId = ths->myRaftId; + pMsgReply->destId = pMsg->srcId; + pMsgReply->term = ths->pRaftStore->currentTerm; + + SSyncLogStoreData *pData = ths->pLogStore->data; + SWal *pWal = pData->pWal; + + if (syncNodeIsMnode(ths)) { + pMsgReply->snapStart = SYNC_INDEX_BEGIN; + + } else { + bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore); + int64_t walCommitVer = walGetCommittedVer(pWal); + + if (!isEmpty && ths->commitIndex != walCommitVer) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", + walCommitVer, ths->commitIndex); + syncNodeErrorLog(ths, logBuf); + + goto _IGNORE; + } + + pMsgReply->snapStart = ths->commitIndex + 1; + + // make local log clean + int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart); + if (code != 0) { + syncNodeErrorLog(ths, "truncate wal error"); + goto _IGNORE; + } + } + + // can not write behind _RESPONSE + SRpcMsg rpcMsg; + +_RESPONSE: + syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + syncPreSnapshotReplyDestroy(pMsgReply); + return 0; + +_IGNORE: + syncPreSnapshotReplyDestroy(pMsgReply); return 0; } int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); + + // start snapshot + return 0; } \ No newline at end of file diff --git a/source/libs/sync/test/syncPreSnapshotReplyTest.cpp b/source/libs/sync/test/syncPreSnapshotReplyTest.cpp index 7fa5c68a1d41d16d004f997f679a7113ef060be5..a30dcc2c545640a7bfe181727465a922107713a5 100644 --- a/source/libs/sync/test/syncPreSnapshotReplyTest.cpp +++ b/source/libs/sync/test/syncPreSnapshotReplyTest.cpp @@ -22,7 +22,7 @@ SyncPreSnapshotReply *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->term = 9527; - pMsg->matchIndex = 12306; + pMsg->snapStart = 12306; return pMsg; }