diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index df5cd3f36c4138e608e70bd22972d54baff48a50..b54cd14ccdf5fe9e613dcf639caa20a0b0cdf35b 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -39,6 +39,8 @@ char* logStore2Str(SSyncLogStore* pLogStore); cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); char* logStoreSimple2Str(SSyncLogStore* pLogStore); +SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); + // SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); // SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); // SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4ebe142315c130074b4b7c49bd106067ed61b1b5..fd31bfc148710e69d6099ec487d256dc3c371eb3 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -454,16 +454,16 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex localPreLogIndex; SyncTerm localPreLogTerm; - ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm); - ASSERT(ret == 0); - - SyncIndex localLastIndex; - SyncTerm localLastTerm; - ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm); - ASSERT(ret == 0); bool logOK; - if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) { + + SyncIndex logFirstIndex = logStoreFirstIndex(ths->pLogStore); + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + if (logFirstIndex > snapshot.lastApplyIndex) { + logOK = false; + + } else if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) { SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); @@ -478,6 +478,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm); } else { + ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm); + ASSERT(ret == 0); + logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && @@ -510,6 +513,21 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } + SyncIndex localLastIndex; + SyncTerm localLastTerm; + if (logFirstIndex == SYNC_INDEX_INVALID) { + localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore); + localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore); + + } else if (logFirstIndex > snapshot.lastApplyIndex) { + localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore); + localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore); + + } else { + ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm); + ASSERT(ret == 0); + } + // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { sTrace( diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a15786e7582ad0214728192295a8f5934faa0808..28b7a0bbf0242dd377d036ccdd9d15a1ad772f48 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -258,6 +258,12 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore) { return serialized; } +SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + return walGetFirstVer(pWal); +} + // for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index bdb55f3a7c1064d0c015797f658dd8a90cae7ef9..4b9a8c51aaaed7c61f49bd71b518ca3adfce7b34 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -151,8 +151,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) || syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { // will send this msg until snapshot receive finish! - SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + + SSnapshot snapshot = pSender->snapshot; sInfo("nextIndex:%ld in snapshot: , begin snapshot", nextIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); @@ -162,6 +162,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { preLogIndex = snapshot.lastApplyIndex; preLogTerm = snapshot.lastApplyTerm; + // update next index! + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, snapshot.lastApplyIndex + 1); + // to claim leader SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); assert(pMsg != NULL); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c894a34fc042887e3117f560123addfb2cb6e0b2..5e7661b4774061167842c879076d0fa5bdb62412 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -41,6 +41,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->finish = false; } else { sInfo("snapshotSenderCreate cannot create sender");