From 4526ff28769554b8c0c7aa002244052e46717226 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 17 Nov 2022 23:37:02 +0800 Subject: [PATCH] enh: add syncLogBuffer.h and syncLogBuffer.c --- source/libs/sync/inc/syncInt.h | 98 +- source/libs/sync/inc/syncLogBuffer.h | 129 +++ source/libs/sync/src/syncAppendEntries.c | 394 +------- source/libs/sync/src/syncAppendEntriesReply.c | 77 +- source/libs/sync/src/syncLogBuffer.c | 944 ++++++++++++++++++ source/libs/sync/src/syncMain.c | 455 +-------- source/libs/sync/src/syncReplication.c | 3 + source/os/src/osTimer.c | 2 +- 8 files changed, 1083 insertions(+), 1019 deletions(-) create mode 100644 source/libs/sync/inc/syncLogBuffer.h create mode 100644 source/libs/sync/src/syncLogBuffer.c diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2199b9b65a..f204f7f4fe 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -50,6 +50,8 @@ typedef struct SyncPreSnapshotReply SyncPreSnapshotReply; typedef struct SyncHeartbeatReply SyncHeartbeatReply; typedef struct SyncHeartbeat SyncHeartbeat; typedef struct SyncPreSnapshot SyncPreSnapshot; +typedef struct SSyncLogBuffer SSyncLogBuffer; +typedef struct SSyncLogReplMgr SSyncLogReplMgr; typedef struct SRaftId { SyncNodeId addr; @@ -85,102 +87,6 @@ typedef struct SPeerState { int64_t lastSendTime; } SPeerState; -typedef struct SSyncReplInfo { - bool barrier; - bool acked; - int64_t timeMs; - int64_t term; -} SSyncReplInfo; - -typedef struct SSyncLogReplMgr { - SSyncReplInfo states[TSDB_SYNC_LOG_BUFFER_SIZE]; - int64_t startIndex; - int64_t matchIndex; - int64_t endIndex; - int64_t size; - bool restored; - int64_t peerStartTime; - int32_t retryBackoff; - int32_t peerId; -} SSyncLogReplMgr; - -SSyncLogReplMgr* syncLogReplMgrCreate(); -void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr); - -// access -static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) { - return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS; -} - -static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { - return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); -} - -static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) { - if (pMgr->endIndex == 0) return -1; - ASSERT(pMgr->startIndex <= index && index < pMgr->endIndex); - pMgr->states[(index + pMgr->size) % pMgr->size].term = term; - return 0; -} - -SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr); -int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); - -int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); -void syncLogDestroyAppendEntries(SRpcMsg* pRpcMsg); - -// others -bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr); - -typedef struct SSyncLogBufEntry { - SSyncRaftEntry* pItem; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; -} SSyncLogBufEntry; - -typedef struct SSyncLogBuffer { - SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE]; - int64_t startIndex; - int64_t commitIndex; - int64_t matchIndex; - int64_t endIndex; - int64_t size; - TdThreadMutex mutex; -} SSyncLogBuffer; - -SSyncLogBuffer* syncLogBufferCreate(); -void syncLogBufferDestroy(SSyncLogBuffer* pBuf); -int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); - -// access -int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); -int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); -int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); -int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); - -int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); -int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg); - -// private -SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); -int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); -int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); -int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); -bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); - -void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); - typedef struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; diff --git a/source/libs/sync/inc/syncLogBuffer.h b/source/libs/sync/inc/syncLogBuffer.h new file mode 100644 index 0000000000..4c209549b4 --- /dev/null +++ b/source/libs/sync/inc/syncLogBuffer.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_LOG_BUFFER_H +#define _TD_LIBS_SYNC_LOG_BUFFER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "syncInt.h" + +typedef struct SSyncReplInfo { + bool barrier; + bool acked; + int64_t timeMs; + int64_t term; +} SSyncReplInfo; + +typedef struct SSyncLogReplMgr { + SSyncReplInfo states[TSDB_SYNC_LOG_BUFFER_SIZE]; + int64_t startIndex; + int64_t matchIndex; + int64_t endIndex; + int64_t size; + bool restored; + int64_t peerStartTime; + int32_t retryBackoff; + int32_t peerId; +} SSyncLogReplMgr; + +SSyncLogReplMgr* syncLogReplMgrCreate(); +void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr); +int32_t syncNodeLogReplMgrInit(SSyncNode* pNode); +void syncNodeLogReplMgrDestroy(SSyncNode* pNode); + +// access +static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) { + return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS; +} + +static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { + return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); +} + +static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) { + if (pMgr->endIndex == 0) return -1; + ASSERT(pMgr->startIndex <= index && index < pMgr->endIndex); + pMgr->states[(index + pMgr->size) % pMgr->size].term = term; + return 0; +} + +SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier); +int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr); +int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); + +int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +void syncLogDestroyAppendEntries(SRpcMsg* pRpcMsg); + +// others +bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr); + +typedef struct SSyncLogBufEntry { + SSyncRaftEntry* pItem; + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; +} SSyncLogBufEntry; + +typedef struct SSyncLogBuffer { + SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE]; + int64_t startIndex; + int64_t commitIndex; + int64_t matchIndex; + int64_t endIndex; + int64_t size; + TdThreadMutex mutex; +} SSyncLogBuffer; + +SSyncLogBuffer* syncLogBufferCreate(); +void syncLogBufferDestroy(SSyncLogBuffer* pBuf); +int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); + +// access +int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); +int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); +int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); +int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); +int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); + +// private +SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); +int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); +int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); + +// others +bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); +void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); +int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); +int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_LOG_BUFFER_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 8197119b87..4eff19ced8 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncAppendEntries.h" +#include "syncLogBuffer.h" #include "syncMessage.h" #include "syncRaftLog.h" #include "syncRaftStore.h" @@ -125,187 +126,6 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } -SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) { - return syncEntryBuildNoop(term, index, vgId); -} - -int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - taosThreadMutexLock(&pBuf->mutex); - ASSERT(pNode->pLogStore != NULL && "log store not created"); - ASSERT(pNode->pFsm != NULL && "pFsm not registered"); - ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); - - SSnapshot snapshot; - if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { - sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); - goto _err; - } - SyncIndex commitIndex = snapshot.lastApplyIndex; - SyncTerm commitTerm = snapshot.lastApplyTerm; - - SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - if (lastVer < commitIndex) { - sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64 - ", tsdb commit version: %" PRId64 "", - pNode->vgId, lastVer, commitIndex); - terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; - goto _err; - } - - ASSERT(lastVer >= commitIndex); - SyncIndex toIndex = lastVer; - // update match index - pBuf->commitIndex = commitIndex; - pBuf->matchIndex = toIndex; - pBuf->endIndex = toIndex + 1; - - // load log entries in reverse order - SSyncLogStore* pLogStore = pNode->pLogStore; - SyncIndex index = toIndex; - SSyncRaftEntry* pEntry = NULL; - bool takeDummy = false; - - while (true) { - if (index <= pBuf->commitIndex) { - takeDummy = true; - break; - } - - if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { - sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); - ASSERT(0); - break; - } - - bool taken = false; - if (toIndex <= index + pBuf->size - 1) { - SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; - pBuf->entries[index % pBuf->size] = tmp; - taken = true; - } - - if (index < toIndex) { - pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index; - pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term; - } - - if (!taken) { - syncEntryDestroy(pEntry); - pEntry = NULL; - break; - } - - index--; - } - - // put a dummy record at commitIndex if present in log buffer - if (takeDummy) { - ASSERT(index == pBuf->commitIndex); - - SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); - if (pDummy == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; - pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; - - if (index < toIndex) { - pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; - pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm; - } - } - - // update startIndex - pBuf->startIndex = takeDummy ? index : index + 1; - - // validate - syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); - return 0; - -_err: - taosThreadMutexUnlock(&pBuf->mutex); - return -1; -} - -FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { - SyncIndex index = pBuf->matchIndex; - SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL); - return pEntry->term; -} - -int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { - taosThreadMutexLock(&pBuf->mutex); - syncLogBufferValidate(pBuf); - int32_t ret = -1; - SyncIndex index = pEntry->index; - SyncIndex prevIndex = pEntry->index - 1; - SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); - - if (index <= pBuf->commitIndex) { - sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); - ret = 0; - goto _out; - } - - if (index - pBuf->startIndex >= pBuf->size) { - sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); - goto _out; - } - - if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { - sInfo("vgId:%d, not ready to accept raft entry. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64 - " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, - pBuf->matchIndex, pBuf->endIndex); - goto _out; - } - - // check current in buffer - SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; - if (pExist != NULL) { - ASSERT(pEntry->index == pExist->index); - - if (pEntry->term != pExist->term) { - (void)syncLogBufferRollback(pBuf, index); - } else { - sDebug("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); - SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; - ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); - ret = 0; - goto _out; - } - } - - // update - SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; - pEntry = NULL; - pBuf->entries[index % pBuf->size] = tmp; - - // update end index - pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); - - // success - ret = 0; - -_out: - syncEntryDestroy(pEntry); - syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); - return ret; -} - SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) { SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); if (pEntry == NULL) { @@ -317,218 +137,6 @@ SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) { return pEntry; } -int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { - ASSERT(pEntry->index >= 0); - SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); - if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { - sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index); - return -1; - } - lastVer = pLogStore->syncLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastVer + 1); - - if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { - sError("failed to append raft log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, - pEntry->term); - return -1; - } - - lastVer = pLogStore->syncLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastVer); - return 0; -} - -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - taosThreadMutexLock(&pBuf->mutex); - syncLogBufferValidate(pBuf); - - SSyncLogStore* pLogStore = pNode->pLogStore; - int64_t matchIndex = pBuf->matchIndex; - - while (pBuf->matchIndex + 1 < pBuf->endIndex) { - int64_t index = pBuf->matchIndex + 1; - ASSERT(index >= 0); - - // try to proceed - SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size]; - SyncIndex prevLogIndex = pBufEntry->prevLogIndex; - SyncTerm prevLogTerm = pBufEntry->prevLogTerm; - SSyncRaftEntry* pEntry = pBufEntry->pItem; - if (pEntry == NULL) { - sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64, - pNode->vgId, pBuf->matchIndex); - goto _out; - } - - ASSERT(index == pEntry->index); - - // match - SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem; - ASSERT(pMatch != NULL); - ASSERT(pMatch->index == pBuf->matchIndex); - ASSERT(pMatch->index + 1 == pEntry->index); - ASSERT(prevLogIndex == pMatch->index); - - if (pMatch->term != prevLogTerm) { - sInfo( - "vgId:%d, mismatching raft log entries encountered. " - "{ index:%" PRId64 ", term:%" PRId64 - " } " - "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ", - pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm); - goto _out; - } - - // increase match index - pBuf->matchIndex = index; - - sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, - pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); - - // replicate on demand - (void)syncNodeReplicate(pNode); - - // persist - if (syncLogStorePersist(pLogStore, pEntry) < 0) { - sError("vgId:%d, failed to persist raft log entry from log buffer since %s. index:%" PRId64, pNode->vgId, - terrstr(), pEntry->index); - goto _out; - } - ASSERT(pEntry->index == pBuf->matchIndex); - - // update my match index - matchIndex = pBuf->matchIndex; - syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); - } // end of while - -_out: - pBuf->matchIndex = matchIndex; - syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); - return matchIndex; -} - -int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { - ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); - - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = -1; - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = role; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = term; - cbMeta.flag = -1; - - pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); - return 0; -} - -int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) { - ASSERT(pBuf->startIndex <= pBuf->matchIndex); - ASSERT(pBuf->commitIndex <= pBuf->matchIndex); - ASSERT(pBuf->matchIndex < pBuf->endIndex); - ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size); - ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem); - return 0; -} - -int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) { - taosThreadMutexLock(&pBuf->mutex); - syncLogBufferValidate(pBuf); - - SSyncLogStore* pLogStore = pNode->pLogStore; - SSyncFSM* pFsm = pNode->pFsm; - ESyncState role = pNode->state; - SyncTerm term = pNode->pRaftStore->currentTerm; - SyncGroupId vgId = pNode->vgId; - int32_t ret = 0; - int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); - SSyncRaftEntry* pEntry = NULL; - bool inBuf = false; - - if (commitIndex <= pBuf->commitIndex) { - sDebug("vgId:%d, stale commit update. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, - commitIndex); - ret = 0; - goto _out; - } - - sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64 - ", match index: %" PRId64 ", end index:%" PRId64 "", - pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - - // execute in fsm - for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { - // get a log entry - pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); - if (pEntry == NULL) { - goto _out; - } - - // execute it - if (!syncUtilUserCommit(pEntry->originalRpcType)) { - sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, - pEntry->term); - pBuf->commitIndex = index; - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - continue; - } - - if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) { - sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId, - pEntry->index, pEntry->term); - ret = -1; - goto _out; - } - pBuf->commitIndex = index; - - sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, - pEntry->index, pEntry->term, role, term); - - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - } - - // recycle - SyncIndex used = pBuf->endIndex - pBuf->startIndex; - SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2; - for (SyncIndex index = pBuf->startIndex; index < until; index++) { - SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL); - syncEntryDestroy(pEntry); - memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); - pBuf->startIndex = index + 1; - } - -_out: - // mark as restored if needed - if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { - pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); - pNode->restoreFinish = true; - sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - } - - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); - return ret; -} - int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; SRpcMsg rpcRsp = {0}; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 3295f03016..36eb6fefc7 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -17,6 +17,7 @@ #include "syncAppendEntriesReply.h" #include "syncCommit.h" #include "syncIndexMgr.h" +#include "syncLogBuffer.h" #include "syncMessage.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" @@ -56,82 +57,6 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { return ths->commitIndex; } -SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) { - SSyncRaftEntry* pEntry = NULL; - if (index >= pBuf->endIndex) { - return NULL; - } - if (index > pBuf->startIndex) { // startIndex might be dummy - *pInBuf = true; - pEntry = pBuf->entries[index % pBuf->size].pItem; - } else { - *pInBuf = false; - if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) { - sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); - } - } - return pEntry; -} - -bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr) { - ASSERT(pMgr->startIndex <= pMgr->endIndex); - for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { - ASSERT(pMgr->states[(index + pMgr->size) % pMgr->size].barrier == false || index + 1 == pMgr->endIndex); - } - return true; -} - -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier) { - SSyncRaftEntry* pEntry = NULL; - SRpcMsg msgOut = {0}; - bool inBuf = false; - int32_t ret = -1; - SyncTerm prevLogTerm = -1; - SSyncLogBuffer* pBuf = pNode->pLogBuf; - - pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); - if (pEntry == NULL) { - sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); - goto _err; - } - *pBarrier = syncLogIsReplicationBarrier(pEntry); - - prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); - if (prevLogTerm < 0) { - sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index); - goto _err; - } - if (pTerm) *pTerm = pEntry->term; - - int32_t code = syncLogToAppendEntries(pNode, pEntry, prevLogTerm, &msgOut); - if (code < 0) { - sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); - goto _err; - } - - (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); - ret = 0; - - sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, - pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); - - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - return 0; - -_err: - rpcFreeCont(msgOut.pCont); - msgOut.pCont = NULL; - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - return -1; -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; int32_t ret = 0; diff --git a/source/libs/sync/src/syncLogBuffer.c b/source/libs/sync/src/syncLogBuffer.c new file mode 100644 index 0000000000..36f33fa46e --- /dev/null +++ b/source/libs/sync/src/syncLogBuffer.c @@ -0,0 +1,944 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE + +#include "syncLogBuffer.h" +#include "syncInt.h" +#include "syncRaftEntry.h" +#include "syncRaftStore.h" +#include "syncReplication.h" +#include "syncUtil.h" + +int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { + taosThreadMutexLock(&pBuf->mutex); + int64_t index = pBuf->endIndex; + taosThreadMutexUnlock(&pBuf->mutex); + return index; +} + +int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + SyncIndex index = pEntry->index; + + if (index - pBuf->startIndex >= pBuf->size) { + sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index); + goto _out; + } + + ASSERT(index == pBuf->endIndex); + + SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; + ASSERT(pExist == NULL); + + // initial log buffer with at least one item, e.g. commitIndex + SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; + ASSERT(pMatch != NULL && "no matched raft log entry"); + ASSERT(pMatch->index + 1 == index); + + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; + pBuf->entries[index % pBuf->size] = tmp; + pBuf->endIndex = index + 1; + + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return 0; + +_out: + syncLogBufferValidate(pBuf); + syncEntryDestroy(pEntry); + taosThreadMutexUnlock(&pBuf->mutex); + return -1; +} + +SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + SSyncRaftEntry* pEntry = NULL; + SyncIndex prevIndex = index - 1; + SyncTerm prevLogTerm = -1; + terrno = TSDB_CODE_SUCCESS; + + if (prevIndex == -1) return 0; + + if (index - 1 > pBuf->matchIndex) { + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } + + ASSERT(index - 1 == prevIndex); + + if (index - 1 >= pBuf->startIndex) { + pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL && "no log entry found"); + prevLogTerm = pBuf->entries[(index + pBuf->size) % pBuf->size].prevLogTerm; + return prevLogTerm; + } + + if (pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { + int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; + ASSERT(timeMs != 0 && "no log entry found"); + prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; + ASSERT(prevIndex == 0 || prevLogTerm != 0); + return prevLogTerm; + } + + SSnapshot snapshot; + if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) == 0 && prevIndex == snapshot.lastApplyIndex) { + return snapshot.lastApplyTerm; + } + + if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) { + prevLogTerm = pEntry->term; + syncEntryDestroy(pEntry); + pEntry = NULL; + return prevLogTerm; + } + + sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex); + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; +} + +SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) { + return syncEntryBuildNoop(term, index, vgId); +} + +int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + ASSERT(pNode->pLogStore != NULL && "log store not created"); + ASSERT(pNode->pFsm != NULL && "pFsm not registered"); + ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + + SSnapshot snapshot; + if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { + sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); + goto _err; + } + SyncIndex commitIndex = snapshot.lastApplyIndex; + SyncTerm commitTerm = snapshot.lastApplyTerm; + + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + if (lastVer < commitIndex) { + sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64 + ", tsdb commit version: %" PRId64 "", + pNode->vgId, lastVer, commitIndex); + terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; + goto _err; + } + + ASSERT(lastVer >= commitIndex); + SyncIndex toIndex = lastVer; + // update match index + pBuf->commitIndex = commitIndex; + pBuf->matchIndex = toIndex; + pBuf->endIndex = toIndex + 1; + + // load log entries in reverse order + SSyncLogStore* pLogStore = pNode->pLogStore; + SyncIndex index = toIndex; + SSyncRaftEntry* pEntry = NULL; + bool takeDummy = false; + + while (true) { + if (index <= pBuf->commitIndex) { + takeDummy = true; + break; + } + + if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { + sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + ASSERT(0); + break; + } + + bool taken = false; + if (toIndex <= index + pBuf->size - 1) { + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; + pBuf->entries[index % pBuf->size] = tmp; + taken = true; + } + + if (index < toIndex) { + pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index; + pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term; + } + + if (!taken) { + syncEntryDestroy(pEntry); + pEntry = NULL; + break; + } + + index--; + } + + // put a dummy record at commitIndex if present in log buffer + if (takeDummy) { + ASSERT(index == pBuf->commitIndex); + + SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); + if (pDummy == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; + pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + + if (index < toIndex) { + pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; + pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm; + } + } + + // update startIndex + pBuf->startIndex = takeDummy ? index : index + 1; + + // validate + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return 0; + +_err: + taosThreadMutexUnlock(&pBuf->mutex); + return -1; +} + +FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { + SyncIndex index = pBuf->matchIndex; + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL); + return pEntry->term; +} + +int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + int32_t ret = -1; + SyncIndex index = pEntry->index; + SyncIndex prevIndex = pEntry->index - 1; + SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); + + if (index <= pBuf->commitIndex) { + sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); + ret = 0; + goto _out; + } + + if (index - pBuf->startIndex >= pBuf->size) { + sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); + goto _out; + } + + if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { + sInfo("vgId:%d, not ready to accept raft entry. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64 + " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, + pBuf->matchIndex, pBuf->endIndex); + goto _out; + } + + // check current in buffer + SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; + if (pExist != NULL) { + ASSERT(pEntry->index == pExist->index); + + if (pEntry->term != pExist->term) { + (void)syncLogBufferRollback(pBuf, index); + } else { + sDebug("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); + SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; + ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); + ret = 0; + goto _out; + } + } + + // update + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; + pEntry = NULL; + pBuf->entries[index % pBuf->size] = tmp; + + // update end index + pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); + + // success + ret = 0; + +_out: + syncEntryDestroy(pEntry); + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + +int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + ASSERT(pEntry->index >= 0); + SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); + if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { + sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index); + return -1; + } + lastVer = pLogStore->syncLogLastIndex(pLogStore); + ASSERT(pEntry->index == lastVer + 1); + + if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { + sError("failed to append raft log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, + pEntry->term); + return -1; + } + + lastVer = pLogStore->syncLogLastIndex(pLogStore); + ASSERT(pEntry->index == lastVer); + return 0; +} + +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + + SSyncLogStore* pLogStore = pNode->pLogStore; + int64_t matchIndex = pBuf->matchIndex; + + while (pBuf->matchIndex + 1 < pBuf->endIndex) { + int64_t index = pBuf->matchIndex + 1; + ASSERT(index >= 0); + + // try to proceed + SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size]; + SyncIndex prevLogIndex = pBufEntry->prevLogIndex; + SyncTerm prevLogTerm = pBufEntry->prevLogTerm; + SSyncRaftEntry* pEntry = pBufEntry->pItem; + if (pEntry == NULL) { + sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64, + pNode->vgId, pBuf->matchIndex); + goto _out; + } + + ASSERT(index == pEntry->index); + + // match + SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem; + ASSERT(pMatch != NULL); + ASSERT(pMatch->index == pBuf->matchIndex); + ASSERT(pMatch->index + 1 == pEntry->index); + ASSERT(prevLogIndex == pMatch->index); + + if (pMatch->term != prevLogTerm) { + sInfo( + "vgId:%d, mismatching raft log entries encountered. " + "{ index:%" PRId64 ", term:%" PRId64 + " } " + "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ", + pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm); + goto _out; + } + + // increase match index + pBuf->matchIndex = index; + + sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, + pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); + + // replicate on demand + (void)syncNodeReplicate(pNode); + + // persist + if (syncLogStorePersist(pLogStore, pEntry) < 0) { + sError("vgId:%d, failed to persist raft log entry from log buffer since %s. index:%" PRId64, pNode->vgId, + terrstr(), pEntry->index); + goto _out; + } + ASSERT(pEntry->index == pBuf->matchIndex); + + // update my match index + matchIndex = pBuf->matchIndex; + syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); + } // end of while + +_out: + pBuf->matchIndex = matchIndex; + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return matchIndex; +} + +int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { + ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + SFsmCbMeta cbMeta = {0}; + cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = -1; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.code = 0; + cbMeta.state = role; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = term; + cbMeta.flag = -1; + + pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); + return 0; +} + +int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) { + ASSERT(pBuf->startIndex <= pBuf->matchIndex); + ASSERT(pBuf->commitIndex <= pBuf->matchIndex); + ASSERT(pBuf->matchIndex < pBuf->endIndex); + ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size); + ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem); + return 0; +} + +int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + + SSyncLogStore* pLogStore = pNode->pLogStore; + SSyncFSM* pFsm = pNode->pFsm; + ESyncState role = pNode->state; + SyncTerm term = pNode->pRaftStore->currentTerm; + SyncGroupId vgId = pNode->vgId; + int32_t ret = 0; + int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); + SSyncRaftEntry* pEntry = NULL; + bool inBuf = false; + + if (commitIndex <= pBuf->commitIndex) { + sDebug("vgId:%d, stale commit update. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, + commitIndex); + ret = 0; + goto _out; + } + + sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64 + ", match index: %" PRId64 ", end index:%" PRId64 "", + pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + + // execute in fsm + for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { + // get a log entry + pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + if (pEntry == NULL) { + goto _out; + } + + // execute it + if (!syncUtilUserCommit(pEntry->originalRpcType)) { + sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, + pEntry->term); + pBuf->commitIndex = index; + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + continue; + } + + if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) { + sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId, + pEntry->index, pEntry->term); + ret = -1; + goto _out; + } + pBuf->commitIndex = index; + + sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, + pEntry->index, pEntry->term, role, term); + + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + } + + // recycle + SyncIndex used = pBuf->endIndex - pBuf->startIndex; + SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2; + for (SyncIndex index = pBuf->startIndex; index < until; index++) { + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL); + syncEntryDestroy(pEntry); + memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + pBuf->startIndex = index + 1; + } + +_out: + // mark as restored if needed + if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { + pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); + pNode->restoreFinish = true; + sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + } + + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + +int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) { + ASSERT(pMgr->startIndex >= 0); + for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { + memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); + } + pMgr->startIndex = 0; + pMgr->matchIndex = 0; + pMgr->endIndex = 0; + pMgr->restored = false; + pMgr->retryBackoff = 0; + return 0; +} + +_Atomic int64_t tsRetryCnt = 0; + +int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + if (pMgr->endIndex <= pMgr->startIndex) { + return 0; + } + + int32_t ret = -1; + bool retried = false; + int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); + + for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { + int64_t pos = index % pMgr->size; + ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex)); + if (pMgr->states[pos].acked) { + continue; + } + int64_t nowMs = taosGetMonoTimestampMs(); + if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) { + break; + } + + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + SyncTerm term = -1; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, pDestId->addr); + goto _out; + } + ASSERT(barrier == pMgr->states[pos].barrier); + pMgr->states[pos].timeMs = nowMs; + pMgr->states[pos].term = term; + pMgr->states[pos].acked = false; + retried = true; + tsRetryCnt++; + } + + ret = 0; +_out: + if (retried) { + pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); + } + return ret; +} + +int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, + SyncAppendEntriesReply* pMsg) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + SRaftId destId = pMsg->srcId; + ASSERT(pMgr->restored == false); + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + if (pMgr->endIndex == 0) { + ASSERT(pMgr->startIndex == 0); + ASSERT(pMgr->matchIndex == 0); + if (pMsg->matchIndex < 0) { + pMgr->restored = true; + sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + return 0; + } + } else { + if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { + syncLogReplMgrRetryOnNeed(pMgr, pNode); + return 0; + } + + pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; + + if (pMsg->matchIndex == pMsg->lastSendIndex) { + pMgr->restored = true; + sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + return 0; + } + + (void)syncLogResetLogReplMgr(pMgr); + } + + // send match index + SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); + bool barrier = false; + SyncTerm term = -1; + ASSERT(index >= 0); + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, destId.addr); + return -1; + } + + int64_t nowMs = taosGetMonoTimestampMs(); + pMgr->states[index % pMgr->size].barrier = barrier; + pMgr->states[index % pMgr->size].timeMs = nowMs; + pMgr->states[index % pMgr->size].term = term; + pMgr->states[index % pMgr->size].acked = false; + + pMgr->matchIndex = index; + pMgr->startIndex = index; + pMgr->endIndex = index + 1; + return 0; +} + +int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + taosThreadMutexLock(&pBuf->mutex); + if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { + sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "", + pNode->vgId, pMsg->startTime, pMgr->peerStartTime); + syncLogResetLogReplMgr(pMgr); + pMgr->peerStartTime = pMsg->startTime; + } + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + +int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + taosThreadMutexLock(&pBuf->mutex); + if (pMsg->startTime != pMgr->peerStartTime) { + sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64 + "", + pNode->vgId, pMsg->startTime, pMgr->peerStartTime); + syncLogResetLogReplMgr(pMgr); + pMgr->peerStartTime = pMsg->startTime; + } + + if (pMgr->restored) { + (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg); + } else { + (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg); + } + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + +int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + if (pMgr->restored) { + (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); + } else { + (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode); + } + return 0; +} + +int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + ASSERT(!pMgr->restored); + SyncIndex index = pNode->pLogBuf->matchIndex; + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + SyncTerm term = -1; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, pDestId->addr); + return -1; + } + + SSyncLogBuffer* pBuf = pNode->pLogBuf; + sDebug("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 + ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + ")", + pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + return 0; +} + +_Atomic int64_t tsSendCnt = 0; + +int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + ASSERT(pMgr->restored); + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + int32_t batchSize = TMAX(1, pMgr->size / 20); + int32_t count = 0; + int64_t nowMs = taosGetMonoTimestampMs(); + + for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { + if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) { + break; + } + if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) { + break; + } + int64_t pos = index % pMgr->size; + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + SyncTerm term = -1; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, pDestId->addr); + return -1; + } + pMgr->states[pos].barrier = barrier; + pMgr->states[pos].timeMs = nowMs; + pMgr->states[pos].term = term; + pMgr->states[pos].acked = false; + + pMgr->endIndex = index + 1; + tsSendCnt++; + if (barrier) { + break; + } + } + + SSyncLogBuffer* pBuf = pNode->pLogBuf; + sDebug("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + syncLogReplMgrRetryOnNeed(pMgr, pNode); + return 0; +} + +int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { + ASSERT(pMgr->restored == true); + if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { + pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; + pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex); + for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) { + memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); + } + pMgr->startIndex = pMgr->matchIndex; + } + + return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); +} + +SSyncLogReplMgr* syncLogReplMgrCreate() { + SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr)); + if (pMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]); + + ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE); + + return pMgr; + +_err: + taosMemoryFree(pMgr); + return NULL; +} + +void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { + if (pMgr == NULL) { + return; + } + (void)taosMemoryFree(pMgr); + return; +} + +int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + ASSERT(pNode->logReplMgrs[i] == NULL); + pNode->logReplMgrs[i] = syncLogReplMgrCreate(); + pNode->logReplMgrs[i]->peerId = i; + ASSERT(pNode->logReplMgrs[i] != NULL && "Out of memory."); + } + return 0; +} + +void syncNodeLogReplMgrDestroy(SSyncNode* pNode) { + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + syncLogReplMgrDestroy(pNode->logReplMgrs[i]); + pNode->logReplMgrs[i] = NULL; + } +} + +SSyncLogBuffer* syncLogBufferCreate() { + SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer)); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); + + ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); + + if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) { + sError("failed to init log buffer mutex due to %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + return pBuf; + +_err: + taosMemoryFree(pBuf); + return NULL; +} + +void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { + if (pBuf == NULL) { + return; + } + (void)taosThreadMutexDestroy(&pBuf->mutex); + (void)taosMemoryFree(pBuf); + return; +} + +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) { + ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); + + SyncIndex index = pBuf->endIndex - 1; + while (index >= toIndex) { + SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; + if (pEntry != NULL) { + syncEntryDestroy(pEntry); + pEntry = NULL; + memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0])); + } + index--; + } + pBuf->endIndex = toIndex; + pBuf->matchIndex = TMIN(pBuf->matchIndex, index); + ASSERT(index + 1 == toIndex); + return 0; +} + +int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + ASSERT(lastVer == pBuf->matchIndex); + SyncIndex index = pBuf->endIndex - 1; + + (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); + + sInfo("vgId:%d, reset log buffer. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + + pBuf->endIndex = pBuf->matchIndex + 1; + + // reset repl mgr + for (int i = 0; i < pNode->replicaNum; i++) { + SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; + syncLogResetLogReplMgr(pMgr); + } + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + +SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) { + SSyncRaftEntry* pEntry = NULL; + if (index >= pBuf->endIndex) { + return NULL; + } + if (index > pBuf->startIndex) { // startIndex might be dummy + *pInBuf = true; + pEntry = pBuf->entries[index % pBuf->size].pItem; + } else { + *pInBuf = false; + if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) { + sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + } + } + return pEntry; +} + +bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr) { + ASSERT(pMgr->startIndex <= pMgr->endIndex); + for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { + ASSERT(pMgr->states[(index + pMgr->size) % pMgr->size].barrier == false || index + 1 == pMgr->endIndex); + } + return true; +} + +int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier) { + SSyncRaftEntry* pEntry = NULL; + SRpcMsg msgOut = {0}; + bool inBuf = false; + int32_t ret = -1; + SyncTerm prevLogTerm = -1; + SSyncLogBuffer* pBuf = pNode->pLogBuf; + + pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + if (pEntry == NULL) { + sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); + goto _err; + } + *pBarrier = syncLogIsReplicationBarrier(pEntry); + + prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); + if (prevLogTerm < 0) { + sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index); + goto _err; + } + if (pTerm) *pTerm = pEntry->term; + + int32_t code = syncLogToAppendEntries(pNode, pEntry, prevLogTerm, &msgOut); + if (code < 0) { + sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); + goto _err; + } + + (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); + ret = 0; + + sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, + pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); + + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + return 0; + +_err: + rpcFreeCont(msgOut.pCont); + msgOut.pCont = NULL; + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + return -1; +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7e19b9aa5c..33c24bcca8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -22,6 +22,7 @@ #include "syncEnv.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncLogBuffer.h" #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftLog.h" @@ -704,327 +705,6 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { return ret; } -int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) { - ASSERT(pMgr->startIndex >= 0); - for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { - memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); - } - pMgr->startIndex = 0; - pMgr->matchIndex = 0; - pMgr->endIndex = 0; - pMgr->restored = false; - pMgr->retryBackoff = 0; - return 0; -} - -_Atomic int64_t tsRetryCnt = 0; - -int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - if (pMgr->endIndex <= pMgr->startIndex) { - return 0; - } - - int32_t ret = -1; - bool retried = false; - int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); - - for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { - int64_t pos = index % pMgr->size; - ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex)); - if (pMgr->states[pos].acked) { - continue; - } - int64_t nowMs = taosGetMonoTimestampMs(); - if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) { - break; - } - - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - bool barrier = false; - SyncTerm term = -1; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, pDestId->addr); - goto _out; - } - ASSERT(barrier == pMgr->states[pos].barrier); - pMgr->states[pos].timeMs = nowMs; - pMgr->states[pos].term = term; - pMgr->states[pos].acked = false; - retried = true; - tsRetryCnt++; - } - - ret = 0; -_out: - if (retried) { - pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); - } - return ret; -} - -int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, - SyncAppendEntriesReply* pMsg) { - SSyncLogBuffer* pBuf = pNode->pLogBuf; - SRaftId destId = pMsg->srcId; - ASSERT(pMgr->restored == false); - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - if (pMgr->endIndex == 0) { - ASSERT(pMgr->startIndex == 0); - ASSERT(pMgr->matchIndex == 0); - if (pMsg->matchIndex < 0) { - pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - return 0; - } - } else { - if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { - syncLogReplMgrRetryOnNeed(pMgr, pNode); - return 0; - } - - pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; - - if (pMsg->matchIndex == pMsg->lastSendIndex) { - pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - return 0; - } - - (void)syncLogResetLogReplMgr(pMgr); - } - - // send match index - SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); - bool barrier = false; - SyncTerm term = -1; - ASSERT(index >= 0); - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, destId.addr); - return -1; - } - - int64_t nowMs = taosGetMonoTimestampMs(); - pMgr->states[index % pMgr->size].barrier = barrier; - pMgr->states[index % pMgr->size].timeMs = nowMs; - pMgr->states[index % pMgr->size].term = term; - pMgr->states[index % pMgr->size].acked = false; - - pMgr->matchIndex = index; - pMgr->startIndex = index; - pMgr->endIndex = index + 1; - return 0; -} - -int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { - SSyncLogBuffer* pBuf = pNode->pLogBuf; - taosThreadMutexLock(&pBuf->mutex); - if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "", - pNode->vgId, pMsg->startTime, pMgr->peerStartTime); - syncLogResetLogReplMgr(pMgr); - pMgr->peerStartTime = pMsg->startTime; - } - taosThreadMutexUnlock(&pBuf->mutex); - return 0; -} - -int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { - SSyncLogBuffer* pBuf = pNode->pLogBuf; - taosThreadMutexLock(&pBuf->mutex); - if (pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64 - "", - pNode->vgId, pMsg->startTime, pMgr->peerStartTime); - syncLogResetLogReplMgr(pMgr); - pMgr->peerStartTime = pMsg->startTime; - } - - if (pMgr->restored) { - (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg); - } else { - (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg); - } - taosThreadMutexUnlock(&pBuf->mutex); - return 0; -} - -int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - SSyncLogBuffer* pBuf = pNode->pLogBuf; - if (pMgr->restored) { - (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); - } else { - (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode); - } - return 0; -} - -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - ASSERT(!pMgr->restored); - SyncIndex index = pNode->pLogBuf->matchIndex; - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - bool barrier = false; - SyncTerm term = -1; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, pDestId->addr); - return -1; - } - - SSyncLogBuffer* pBuf = pNode->pLogBuf; - sDebug("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 - ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 - ")", - pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - return 0; -} - -_Atomic int64_t tsSendCnt = 0; - -int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - ASSERT(pMgr->restored); - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - int32_t batchSize = TMAX(1, pMgr->size / 20); - int32_t count = 0; - int64_t nowMs = taosGetMonoTimestampMs(); - - for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { - if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) { - break; - } - if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) { - break; - } - int64_t pos = index % pMgr->size; - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - bool barrier = false; - SyncTerm term = -1; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, pDestId->addr); - return -1; - } - pMgr->states[pos].barrier = barrier; - pMgr->states[pos].timeMs = nowMs; - pMgr->states[pos].term = term; - pMgr->states[pos].acked = false; - - pMgr->endIndex = index + 1; - tsSendCnt++; - if (barrier) { - break; - } - } - - SSyncLogBuffer* pBuf = pNode->pLogBuf; - sDebug("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - syncLogReplMgrRetryOnNeed(pMgr, pNode); - return 0; -} - -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { - ASSERT(pMgr->restored == true); - if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { - pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; - pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex); - for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) { - memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); - } - pMgr->startIndex = pMgr->matchIndex; - } - - return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); -} - -SSyncLogReplMgr* syncLogReplMgrCreate() { - SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr)); - if (pMgr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]); - - ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE); - - return pMgr; - -_err: - taosMemoryFree(pMgr); - return NULL; -} - -void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { - if (pMgr == NULL) { - return; - } - (void)taosMemoryFree(pMgr); - return; -} - -int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - ASSERT(pNode->logReplMgrs[i] == NULL); - pNode->logReplMgrs[i] = syncLogReplMgrCreate(); - pNode->logReplMgrs[i]->peerId = i; - ASSERT(pNode->logReplMgrs[i] != NULL && "Out of memory."); - } - return 0; -} - -void syncNodeLogReplMgrDestroy(SSyncNode* pNode) { - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - syncLogReplMgrDestroy(pNode->logReplMgrs[i]); - pNode->logReplMgrs[i] = NULL; - } -} - -SSyncLogBuffer* syncLogBufferCreate() { - SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer)); - if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); - - ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); - - if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) { - sError("failed to init log buffer mutex due to %s", strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - return pBuf; - -_err: - taosMemoryFree(pBuf); - return NULL; -} - -void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { - if (pBuf == NULL) { - return; - } - (void)taosThreadMutexDestroy(&pBuf->mutex); - (void)taosMemoryFree(pBuf); - return; -} - // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); @@ -1895,47 +1575,6 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } -int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) { - ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); - - SyncIndex index = pBuf->endIndex - 1; - while (index >= toIndex) { - SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; - if (pEntry != NULL) { - syncEntryDestroy(pEntry); - pEntry = NULL; - memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0])); - } - index--; - } - pBuf->endIndex = toIndex; - pBuf->matchIndex = TMIN(pBuf->matchIndex, index); - ASSERT(index + 1 == toIndex); - return 0; -} - -int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - taosThreadMutexLock(&pBuf->mutex); - SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(lastVer == pBuf->matchIndex); - SyncIndex index = pBuf->endIndex - 1; - - (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); - - sInfo("vgId:%d, reset log buffer. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - - pBuf->endIndex = pBuf->matchIndex + 1; - - // reset repl mgr - for (int i = 0; i < pNode->replicaNum; i++) { - SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - syncLogResetLogReplMgr(pMgr); - } - taosThreadMutexUnlock(&pBuf->mutex); - return 0; -} - void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -2675,96 +2314,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { return 0; } -int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { - taosThreadMutexLock(&pBuf->mutex); - int64_t index = pBuf->endIndex; - taosThreadMutexUnlock(&pBuf->mutex); - return index; -} - -int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) { - taosThreadMutexLock(&pBuf->mutex); - syncLogBufferValidate(pBuf); - SyncIndex index = pEntry->index; - - if (index - pBuf->startIndex >= pBuf->size) { - sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index); - goto _out; - } - - ASSERT(index == pBuf->endIndex); - - SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; - ASSERT(pExist == NULL); - - // initial log buffer with at least one item, e.g. commitIndex - SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - ASSERT(pMatch != NULL && "no matched raft log entry"); - ASSERT(pMatch->index + 1 == index); - - SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; - pBuf->entries[index % pBuf->size] = tmp; - pBuf->endIndex = index + 1; - - syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); - return 0; - -_out: - syncLogBufferValidate(pBuf); - syncEntryDestroy(pEntry); - taosThreadMutexUnlock(&pBuf->mutex); - return -1; -} - -SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { - SSyncLogBuffer* pBuf = pNode->pLogBuf; - SSyncRaftEntry* pEntry = NULL; - SyncIndex prevIndex = index - 1; - SyncTerm prevLogTerm = -1; - terrno = TSDB_CODE_SUCCESS; - - if (prevIndex == -1) return 0; - - if (index - 1 > pBuf->matchIndex) { - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; - } - - ASSERT(index - 1 == prevIndex); - - if (index - 1 >= pBuf->startIndex) { - pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL && "no log entry found"); - prevLogTerm = pBuf->entries[(index + pBuf->size) % pBuf->size].prevLogTerm; - return prevLogTerm; - } - - if (pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { - int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; - ASSERT(timeMs != 0 && "no log entry found"); - prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; - ASSERT(prevIndex == 0 || prevLogTerm != 0); - return prevLogTerm; - } - - SSnapshot snapshot; - if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) == 0 && prevIndex == snapshot.lastApplyIndex) { - return snapshot.lastApplyTerm; - } - - if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) { - prevLogTerm = pEntry->term; - syncEntryDestroy(pEntry); - pEntry = NULL; - return prevLogTerm; - } - - sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex); - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; -} - int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg) { uint32_t dataLen = pEntry->bytes; uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; @@ -3014,7 +2563,7 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { } bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { - return false; + // return false; return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 46c7649e02..e8709f95a2 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "syncReplication.h" #include "syncIndexMgr.h" +#include "syncLogBuffer.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" #include "syncUtil.h" @@ -45,6 +46,8 @@ // mdest |-> j]) // /\ UNCHANGED <> +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); + int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) { // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index d2b8f3134a..d1c233ea9c 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -216,7 +216,7 @@ int64_t taosGetMonotonicMs() { #if 0 return getMonotonicUs() / 1000; #else - return taosGetMonoTimestampMs(); + return taosGetTimestampMs(); #endif } -- GitLab