diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index d8697da64b6b5f2738950966be1a8474f7b9e358..9a68b1e0c8c8f0d66258181c59acdf497cec84d9 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -89,14 +89,15 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); +int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); // SSyncLogBuffer SSyncLogBuffer* syncLogBufferCreate(); void syncLogBufferDestroy(SSyncLogBuffer* pBuf); int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); +int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); // access int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8177f3b6db2ac068c3dc0e6bc20e7529f948dccd..a4c0022d987b080dd68dc52fa770c6689e633b29 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -704,6 +704,28 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { return ret; } +int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { + ASSERT(pNode->pLogStore != NULL && "log store not created"); + ASSERT(pNode->pFsm != NULL && "pFsm not registered"); + ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + SSnapshot snapshot; + if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { + sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); + return -1; + } + SyncIndex commitIndex = snapshot.lastApplyIndex; + SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + if (lastVer < commitIndex || firstVer > commitIndex + 1) { + if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) { + sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64, + pNode->vgId, terrstr(), lastVer, commitIndex); + return -1; + } + } + return 0; +} + // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); @@ -912,6 +934,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { } pSyncNode->commitIndex = commitIndex; + if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) { + goto _error; + } // timer ms init pSyncNode->pingBaseLine = PING_TIMER_MS; pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a31812e51e81b682d77f2c0c039474910af3b5aa..71ac5d1464d5efa70d18854850e6980e66a6ff9a 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -139,8 +139,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex return 0; } -int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - taosThreadMutexLock(&pBuf->mutex); +int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ASSERT(pNode->pLogStore != NULL && "log store not created"); ASSERT(pNode->pFsm != NULL && "pFsm not registered"); ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); @@ -226,14 +225,37 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // validate syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); return 0; _err: - taosThreadMutexUnlock(&pBuf->mutex); return -1; } +int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode); + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + +int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) { + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + if (pEntry == NULL) continue; + syncEntryDestroy(pEntry); + pEntry = NULL; + memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + } + pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode); + if (ret < 0) { + sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr()); + } + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { SyncIndex index = pBuf->matchIndex; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; @@ -628,7 +650,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod // check existence of WAl log SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); - if (pMsg->matchIndex < firstVer) { + if (pMsg->matchIndex + 1 < firstVer) { if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr); } @@ -929,7 +951,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); if (pMgr) { - sInfo("vgId:%d, reset log repl mgr for dest: 0x%016" PRIx64, pNode->vgId, pDestId->addr); + sInfo("vgId:%d, reset sync log repl mgr for peer: 0x%016" PRIx64 " since %s. index: %" PRId64, pNode->vgId, + pDestId->addr, terrstr(), index); (void)syncLogReplMgrReset(pMgr); } } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 222b7c4e1ef97a3c82fcb419597875ef4f1dab0d..1035925c2b3d59ccc90a59aac2e9e27f04581add 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "syncSnapshot.h" #include "syncIndexMgr.h" +#include "syncPipeline.h" #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" @@ -273,6 +274,11 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 1; } + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + sInfo("vgId:%d, start snapshot for peer: %s:%d", pSyncNode->vgId, host, port); + code = snapshotSenderStart(pSender); if (code != 0) { sNError(pSyncNode, "snapshot sender start error"); @@ -372,7 +378,10 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh } int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) { - ASSERT(!snapshotReceiverIsStart(pReceiver)); + if (snapshotReceiverIsStart(pReceiver)) { + sWarn("vgId:%d, snapshot receiver has started.", pReceiver->pSyncNode->vgId); + return 0; + } pReceiver->start = true; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; @@ -738,6 +747,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { syncNodeOnSnapshotEnd(pSyncNode, pMsg); + (void)syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { // force close, no response diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 10fb2ee97e38defcd2ff2cc4d2c0cccd31f6dde2..4a1b2d1444f1815ac02c02fa210e4e11f3da18fb 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -70,7 +70,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pWal->lastRollSeq = -1; taosArrayClear(pWal->fileInfoSet); - pWal->vers.firstVer = -1; + pWal->vers.firstVer = ver + 1; pWal->vers.lastVer = ver; pWal->vers.commitVer = ver; pWal->vers.snapshotVer = ver;