From 18b78f2450e3d296adc218c6f31b5436fd071aaa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 29 Jun 2022 14:50:59 +0800 Subject: [PATCH] refactor(sync): add return code restore from snapshot --- source/libs/sync/src/syncRaftLog.c | 127 +++++++++++++++------------- source/libs/sync/src/syncSnapshot.c | 52 +++++++++--- 2 files changed, 105 insertions(+), 74 deletions(-) diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 9c4d527aed..a574c8cc1e 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -17,23 +17,27 @@ #include "syncRaftCfg.h" #include "syncRaftStore.h" -// refactor, log[0 .. n] ==> log[m .. n] +//------------------------------- +// log[m .. n] + +// public function static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore); static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore); - static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore); static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); +// private function static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry); //------------------------------- +// log[0 .. n] static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); @@ -43,6 +47,65 @@ static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex from static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); +//------------------------------- +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { + SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); + ASSERT(pLogStore != NULL); + + pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); + ASSERT(pLogStore->data != NULL); + + SSyncLogStoreData* pData = pLogStore->data; + pData->pSyncNode = pSyncNode; + pData->pWal = pSyncNode->pWal; + ASSERT(pData->pWal != NULL); + + taosThreadMutexInit(&(pData->mutex), NULL); + pData->pWalHandle = walOpenReadHandle(pData->pWal); + ASSERT(pData->pWalHandle != NULL); + + pLogStore->appendEntry = logStoreAppendEntry; + pLogStore->getEntry = logStoreGetEntry; + pLogStore->truncate = logStoreTruncate; + pLogStore->getLastIndex = logStoreLastIndex; + pLogStore->getLastTerm = logStoreLastTerm; + pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; + pLogStore->getCommitIndex = logStoreGetCommitIndex; + + pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot; + pLogStore->syncLogBeginIndex = raftLogBeginIndex; + pLogStore->syncLogEndIndex = raftLogEndIndex; + pLogStore->syncLogIsEmpty = raftLogIsEmpty; + pLogStore->syncLogEntryCount = raftLogEntryCount; + pLogStore->syncLogLastIndex = raftLogLastIndex; + pLogStore->syncLogLastTerm = raftLogLastTerm; + pLogStore->syncLogAppendEntry = raftLogAppendEntry; + pLogStore->syncLogGetEntry = raftLogGetEntry; + pLogStore->syncLogTruncate = raftLogTruncate; + pLogStore->syncLogWriteIndex = raftLogWriteIndex; + + return pLogStore; +} + +void logStoreDestory(SSyncLogStore* pLogStore) { + if (pLogStore != NULL) { + SSyncLogStoreData* pData = pLogStore->data; + + taosThreadMutexLock(&(pData->mutex)); + if (pData->pWalHandle != NULL) { + walCloseReadHandle(pData->pWalHandle); + pData->pWalHandle = NULL; + } + taosThreadMutexUnlock(&(pData->mutex)); + taosThreadMutexDestroy(&(pData->mutex)); + + taosMemoryFree(pLogStore->data); + taosMemoryFree(pLogStore); + } +} + +//------------------------------- +// log[m .. n] static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { ASSERT(snapshotIndex >= 0); @@ -61,7 +124,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI err, errStr, sysErr, sysErrStr); syncNodeErrorLog(pData->pSyncNode, logBuf); - ASSERT(0); + return -1; } return 0; @@ -265,63 +328,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp } //------------------------------- -SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { - SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); - ASSERT(pLogStore != NULL); - - pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); - ASSERT(pLogStore->data != NULL); - - SSyncLogStoreData* pData = pLogStore->data; - pData->pSyncNode = pSyncNode; - pData->pWal = pSyncNode->pWal; - ASSERT(pData->pWal != NULL); - - taosThreadMutexInit(&(pData->mutex), NULL); - pData->pWalHandle = walOpenReadHandle(pData->pWal); - ASSERT(pData->pWalHandle != NULL); - - pLogStore->appendEntry = logStoreAppendEntry; - pLogStore->getEntry = logStoreGetEntry; - pLogStore->truncate = logStoreTruncate; - pLogStore->getLastIndex = logStoreLastIndex; - pLogStore->getLastTerm = logStoreLastTerm; - pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; - pLogStore->getCommitIndex = logStoreGetCommitIndex; - - pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot; - pLogStore->syncLogBeginIndex = raftLogBeginIndex; - pLogStore->syncLogEndIndex = raftLogEndIndex; - pLogStore->syncLogIsEmpty = raftLogIsEmpty; - pLogStore->syncLogEntryCount = raftLogEntryCount; - pLogStore->syncLogLastIndex = raftLogLastIndex; - pLogStore->syncLogLastTerm = raftLogLastTerm; - pLogStore->syncLogAppendEntry = raftLogAppendEntry; - pLogStore->syncLogGetEntry = raftLogGetEntry; - pLogStore->syncLogTruncate = raftLogTruncate; - pLogStore->syncLogWriteIndex = raftLogWriteIndex; - - return pLogStore; -} - -void logStoreDestory(SSyncLogStore* pLogStore) { - if (pLogStore != NULL) { - SSyncLogStoreData* pData = pLogStore->data; - - taosThreadMutexLock(&(pData->mutex)); - if (pData->pWalHandle != NULL) { - walCloseReadHandle(pData->pWalHandle); - pData->pWalHandle = NULL; - } - taosThreadMutexUnlock(&(pData->mutex)); - taosThreadMutexDestroy(&(pData->mutex)); - - taosMemoryFree(pLogStore->data); - taosMemoryFree(pLogStore); - } -} - -//------------------------------- +// log[0 .. n] int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 24fe8564ea..70f3801e61 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -508,32 +508,51 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } while (0); } -static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { +static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END); + int32_t code = 0; if (pReceiver->pWriter != NULL) { - int32_t code = 0; + // write data if (pMsg->dataLen > 0) { code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); - ASSERT(code == 0); + if (code != 0) { + syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error"); + return -1; + } } + // reset wal + code = + pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); + if (code != 0) { + syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error"); + return -1; + } + + // update commit index + if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { + pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; + } + + // stop writer code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); - ASSERT(code == 0); + if (code != 0) { + syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error"); + ASSERT(0); + return -1; + } pReceiver->pWriter = NULL; - } - pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; + // update progress + pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; - // update commit index - if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { - pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; + } else { + syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error"); + return -1; } - // reset wal - pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); - // event log do { SSnapshot snapshot; @@ -542,6 +561,8 @@ static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapsho syncNodeEventLog(pReceiver->pSyncNode, eventLog); taosMemoryFree(eventLog); } while (0); + + return 0; } static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { @@ -642,6 +663,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // get receiver SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; bool needRsp = false; + int32_t code = 0; // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { @@ -653,8 +675,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { // end, finish FSM - snapshotReceiverFinish(pReceiver, pMsg); - snapshotReceiverStop(pReceiver); + code = snapshotReceiverFinish(pReceiver, pMsg); + if (code == 0) { + snapshotReceiverStop(pReceiver); + } needRsp = true; // maybe update lastconfig -- GitLab