From 44994201b6f143d7a292727d705001668bf8f0e9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 2 Jun 2022 19:47:06 +0800 Subject: [PATCH] fix(sync): sending snapshot --- source/libs/sync/inc/syncRaftLog.h | 2 ++ source/libs/sync/src/syncAppendEntries.c | 34 ++++++++++++++++++------ source/libs/sync/src/syncRaftLog.c | 6 +++++ source/libs/sync/src/syncReplication.c | 7 +++-- source/libs/sync/src/syncSnapshot.c | 2 ++ 5 files changed, 41 insertions(+), 10 deletions(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index df5cd3f36c..b54cd14ccd 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -39,6 +39,8 @@ char* logStore2Str(SSyncLogStore* pLogStore); cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); char* logStoreSimple2Str(SSyncLogStore* pLogStore); +SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); + // SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); // SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); // SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4ebe142315..fd31bfc148 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -454,16 +454,16 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex localPreLogIndex; SyncTerm localPreLogTerm; - ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm); - ASSERT(ret == 0); - - SyncIndex localLastIndex; - SyncTerm localLastTerm; - ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm); - ASSERT(ret == 0); bool logOK; - if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) { + + SyncIndex logFirstIndex = logStoreFirstIndex(ths->pLogStore); + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + if (logFirstIndex > snapshot.lastApplyIndex) { + logOK = false; + + } else if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) { SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); @@ -478,6 +478,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm); } else { + ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm); + ASSERT(ret == 0); + logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && @@ -510,6 +513,21 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } + SyncIndex localLastIndex; + SyncTerm localLastTerm; + if (logFirstIndex == SYNC_INDEX_INVALID) { + localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore); + localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore); + + } else if (logFirstIndex > snapshot.lastApplyIndex) { + localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore); + localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore); + + } else { + ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm); + ASSERT(ret == 0); + } + // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { sTrace( diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a15786e758..28b7a0bbf0 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -258,6 +258,12 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore) { return serialized; } +SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + return walGetFirstVer(pWal); +} + // for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index bdb55f3a7c..4b9a8c51aa 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -151,8 +151,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) || syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { // will send this msg until snapshot receive finish! - SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + + SSnapshot snapshot = pSender->snapshot; sInfo("nextIndex:%ld in snapshot: , begin snapshot", nextIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); @@ -162,6 +162,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { preLogIndex = snapshot.lastApplyIndex; preLogTerm = snapshot.lastApplyTerm; + // update next index! + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, snapshot.lastApplyIndex + 1); + // to claim leader SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); assert(pMsg != NULL); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c894a34fc0..5e7661b477 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -41,6 +41,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->finish = false; } else { sInfo("snapshotSenderCreate cannot create sender"); -- GitLab