From 91238e823ebaaa3fce08ec88988503f0741c4766 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 1 Jul 2022 14:22:14 +0800 Subject: [PATCH] refactor(sync): add snapshot2 interface --- include/libs/sync/sync.h | 1 + source/libs/sync/inc/syncAppendEntriesReply.h | 5 + source/libs/sync/inc/syncReplication.h | 5 - source/libs/sync/src/syncAppendEntriesReply.c | 126 +++++++++--------- source/libs/sync/src/syncReplication.c | 36 +---- 5 files changed, 72 insertions(+), 101 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9a4a00251c..bd148fd158 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -165,6 +165,7 @@ typedef struct SSyncLogStore { bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index); + bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 03148252fb..70ce5a72c6 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -44,6 +44,11 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +typedef struct SReaderParam { + SyncIndex start; + SyncIndex end; +} SReaderParam; + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index d6aa52523c..07e12460da 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -58,11 +58,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); -typedef struct SReaderParam { - SyncIndex start; - SyncIndex end; -} SReaderParam; - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 39803587e4..827b2a59a5 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -108,67 +108,29 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p return ret; } -#if 0 -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm); - syncAppendEntriesReplyLog2(logBuf, pMsg); - - if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, - ths->pRaftStore->currentTerm); - return ret; - } - - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); - - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term, - ths->pRaftStore->currentTerm); - syncNodeLog2(logBuf, ths); - sError("%s", logBuf); - return ret; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - if (pMsg->success) { - // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - - // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); - - // maybe commit - syncMaybeAdvanceCommitIndex(ths); +// only start once +static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, + SyncAppendEntriesReply* pMsg) { + // get sender + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); + ASSERT(pSender != NULL); + + SSnapshot snapshot = { + .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; + + void* pReader = NULL; + SReaderParam readerParam = {.start = beginIndex, .end = endIndex}; + ths->pFsm->FpSnapshotStartRead(ths->pFsm, &pReader); + if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { + ASSERT(pReader != NULL); + snapshotSenderStart(pSender, snapshot, pReader); } else { - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - - // notice! int64, uint64 - if (nextIndex > SYNC_INDEX_BEGIN) { - --nextIndex; - } else { - nextIndex = SYNC_INDEX_BEGIN; + if (pReader != NULL) { + ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); } - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } - - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex); - - return ret; } -#endif int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; @@ -187,6 +149,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie return -1; } + // error term if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); @@ -197,19 +160,29 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { - // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + SyncIndex newNextIndex = pMsg->matchIndex + 1; + SyncIndex newMatchIndex = pMsg->matchIndex; - if (gRaftDetailLog) { - sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); - } + if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) && + ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) { + // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex); - // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); - // maybe commit - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncMaybeAdvanceCommitIndex(ths); + // maybe commit + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncMaybeAdvanceCommitIndex(ths); + } + } else { + // start snapshot + SSnapshot snapshot; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + syncNodeStartSnapshot(ths, newMatchIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pMsg); + + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); } } else { @@ -218,6 +191,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; + + if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex) && + ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) { + // do nothing + } else { + SSyncRaftEntry* pEntry; + int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); + ASSERT(code == 0); + syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); + + // get sender + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); + ASSERT(pSender != NULL); + SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; + + // update nextIndex to sentryIndex + if (nextIndex <= sentryIndex) { + nextIndex = sentryIndex; + } + } + } else { nextIndex = SYNC_INDEX_BEGIN; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index bc834786c6..4908822a3a 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -128,38 +128,14 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - // if pre-entry not exist, create snapshot - SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preLogIndex, &pPreEntry); - if (code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - void* pReader = NULL; - SReaderParam readerParam = {.start = 0, .end = nextIndex}; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot, &readerParam, &pReader); - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); - ASSERT(pSender != NULL); - - if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && - !snapshotSenderIsStart(pSender)) { - // start snapshot - snapshotSenderStart(pSender, snapshot, pReader); - } else { - // no snapshot - if (pReader != NULL) { - pSyncNode->pFsm->FpSnapshotStopRead(pSyncNode->pFsm, pReader); - } - } - } - } - // pre index, pre term - preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); if (preLogTerm == SYNC_TERM_INVALID) { + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); + ASSERT(pSender != NULL); + ASSERT(!snapshotSenderIsStart(pSender)); + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); -- GitLab