From b9e4543a40c33bf0baf2fce95a229ca17d3d7e55 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 30 May 2022 21:21:51 +0800 Subject: [PATCH] enh(sync): add SyncSnapshotRsp SyncSnapshotSend --- source/libs/sync/inc/syncInt.h | 4 ++-- source/libs/sync/src/syncAppendEntries.c | 6 +++++- source/libs/sync/src/syncCommit.c | 6 +++++- source/libs/sync/src/syncMain.c | 13 ++++++++----- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 41463e2f7f..fbe31ef031 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -162,8 +162,8 @@ typedef struct SSyncNode { // restore state // sem_t restoreSem; - bool restoreFinish; - SSnapshot* pSnapshot; + bool restoreFinish; + // SSnapshot* pSnapshot; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA]; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index fc20d6dd87..967ed5a00d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -335,8 +335,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.flag = 0x11; + SSnapshot snapshot; + ASSERT(ths->pFsm->FpGetSnapshot != NULL); + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + bool needExecute = true; - if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { + if (cbMeta.index <= snapshot.lastApplyIndex) { needExecute = false; } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 4a1a40a2d7..b683ab12b4 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -113,8 +113,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; cbMeta.flag = 0x1; + SSnapshot snapshot; + ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + bool needExecute = true; - if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { + if (cbMeta.index <= snapshot.lastApplyIndex) { needExecute = false; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 947e952276..65101e0215 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -575,11 +575,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // restore state pSyncNode->restoreFinish = false; - pSyncNode->pSnapshot = NULL; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); - } + + // pSyncNode->pSnapshot = NULL; + // if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + // pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); + // pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + // } // tsem_init(&(pSyncNode->restoreSem), 0, 0); // start in syncNodeStart @@ -673,9 +674,11 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + /* if (pSyncNode->pSnapshot != NULL) { taosMemoryFree(pSyncNode->pSnapshot); } + */ // tsem_destroy(&pSyncNode->restoreSem); -- GitLab