From def4058eb113a2bf601aed2f12adabdc2b953e93 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 11 Nov 2022 22:55:21 +0800 Subject: [PATCH] feat: impl pipelining negotiation --- include/libs/sync/sync.h | 2 + include/os/osTime.h | 7 + include/util/tdef.h | 2 +- source/libs/sync/inc/syncInt.h | 67 +++- source/libs/sync/src/syncAppendEntries.c | 118 +++--- source/libs/sync/src/syncAppendEntriesReply.c | 111 +++--- source/libs/sync/src/syncIndexMgr.c | 9 + source/libs/sync/src/syncMain.c | 362 ++++++++++++++++-- source/libs/sync/src/syncReplication.c | 27 +- source/libs/transport/src/tmsgcb.c | 3 + 10 files changed, 584 insertions(+), 124 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7ed8414906..a477cea93c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -40,6 +40,8 @@ extern bool gRaftDetailLog; #define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_VNODE_LOG_RETENTION 500 +#define SYNC_MAX_RETRY_BACKOFF 5 +#define SYNC_LOG_REPL_RETRY_WAIT_MS 50 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 #define SYNC_MAX_BATCH_SIZE 1 diff --git a/include/os/osTime.h b/include/os/osTime.h index 48f046d4d0..88eabd206d 100644 --- a/include/os/osTime.h +++ b/include/os/osTime.h @@ -82,6 +82,13 @@ static FORCE_INLINE int64_t taosGetTimestampNs() { return (int64_t)systemTime.tv_sec * 1000000000LL + (int64_t)systemTime.tv_nsec; } +//@return timestamp of monotonic clock in millisecond +static FORCE_INLINE int64_t taosGetMonoTimestampMs() { + struct timespec systemTime = {0}; + taosClockGetTime(CLOCK_MONOTONIC, &systemTime); + return (int64_t)systemTime.tv_sec * 1000LL + (int64_t)systemTime.tv_nsec / 1000000; +} + char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm); struct tm *taosLocalTime(const time_t *timep, struct tm *result); struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst); diff --git a/include/util/tdef.h b/include/util/tdef.h index 0b7f9b28fa..c5776e8d87 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -281,7 +281,7 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_MAX_REPLICA 5 -#define TSDB_SYNC_LOG_BUFFER_SIZE 500 +#define TSDB_SYNC_LOG_BUFFER_SIZE 512 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index bd97187ce7..58b32ed025 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -88,6 +88,60 @@ typedef struct SPeerState { int64_t lastSendTime; } SPeerState; +typedef struct SSyncReplInfo { + bool barrier; + bool acked; + int64_t timeMs; + int64_t term; +} SSyncReplInfo; + +typedef struct SSyncLogReplMgr { + SSyncReplInfo states[TSDB_SYNC_LOG_BUFFER_SIZE]; + int64_t startIndex; + int64_t matchIndex; + int64_t endIndex; + int64_t size; + bool restored; + int64_t peerStartTime; + int32_t retryBackoff; + int32_t peerId; +} SSyncLogReplMgr; + +SSyncLogReplMgr* syncLogReplMgrCreate(); +void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr); + +// access +static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) { + return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS; +} + +static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { + return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); +} + +static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) { + if (index < pMgr->startIndex || index >= pMgr->endIndex) { + return -1; + } + pMgr->states[(index + pMgr->size) % pMgr->size].term = term; + return 0; +} + +SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId, + bool* pBarrier); +int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr); +int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); + +// others +bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr); + typedef struct SSyncLogBufEntry { SSyncRaftEntry* pItem; SyncIndex prevLogIndex; @@ -115,14 +169,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt int64_t syncLogBufferLoad(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); +SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); -SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index); +SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); // private int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); -int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index); +int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); @@ -225,11 +280,14 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - _Atomic bool restoreFinish; + bool restoreFinish; // SSnapshot* pSnapshot; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* pNewNodeReceiver; + // log replication mgr + SSyncLogReplMgr* logReplMgrs[TSDB_MAX_REPLICA]; + SPeerState peerStates[TSDB_MAX_REPLICA]; // is config changing @@ -309,6 +367,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); +// log replication +SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId); + // snapshot -------------- bool syncNodeHasSnapshot(SSyncNode* pSyncNode); void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 9560ea269b..285981012e 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -18,6 +18,7 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncReplication.h" #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" @@ -318,16 +319,17 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ASSERT(pNode->pFsm != NULL && "pFsm not registered"); ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); - SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); SSnapshot snapshot; if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); goto _err; } + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); SyncIndex commitIndex = snapshot.lastApplyIndex; SyncTerm commitTerm = snapshot.lastApplyTerm; SyncIndex toIndex = TMAX(lastVer, commitIndex); + ASSERT(lastVer >= commitIndex); // update match index pBuf->commitIndex = commitIndex; @@ -392,7 +394,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } // update startIndex - pBuf->startIndex = index; + pBuf->startIndex = takeDummy ? index : index + 1; // validate syncLogBufferValidate(pBuf); @@ -491,18 +493,44 @@ int32_t syncLogBufferRollbackMatchIndex(SSyncLogBuffer* pBuf, SSyncNode* pNode, return 0; } +FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { + SyncIndex index = pBuf->matchIndex; + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL); + return pEntry->term; +} + int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); - - int32_t ret = 0; + int32_t ret = -1; SyncIndex index = pEntry->index; SyncIndex prevIndex = pEntry->index - 1; - if (index <= pBuf->commitIndex || index - pBuf->startIndex > pBuf->size) { - sInfo("vgId:%d, cannot accept index:%" PRId64 " into log buffer. start index: %" PRId64 ", commit index: %" PRId64 - ", end index:%" PRId64 ")", - pNode->vgId, index, pBuf->startIndex, pBuf->commitIndex, pBuf->endIndex); - ret = (index <= pBuf->commitIndex) ? 0 : -1; + SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); + + if (index <= pBuf->commitIndex) { + sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); + ret = 0; + goto _out; + } + + if (index - pBuf->startIndex >= pBuf->size) { + sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); + goto _out; + } + + if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { + sInfo("vgId:%d, not ready to accept raft entry (i.e. across barrier). index: %" PRId64 ", term: %" PRId64 + ": prevterm: %" PRId64 " /= lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 + ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, + pBuf->matchIndex, pBuf->endIndex); goto _out; } @@ -511,14 +539,16 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt if (pExist != NULL) { ASSERT(pEntry->index == pExist->index); - if (pEntry->term > pExist->term) { + if (pEntry->term != pExist->term) { (void)syncLogBufferRollback(pBuf, index); } else { - sInfo("vgId:%d, %s raft entry received. index:%" PRId64 ", term: %" PRId64 "", pNode->vgId, - ((pEntry->term < pExist->term) ? "stale" : "duplicate"), pEntry->index, pEntry->term); + sInfo("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; - ASSERT(pEntry->term < pExist->term || (pEntry->term == pExist->term && prevTerm == existPrevTerm)); - ret = (pEntry->term < pExist->term) ? 0 : -1; + ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); + ret = 0; goto _out; } } @@ -531,6 +561,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt // update end index pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); + // success + ret = 0; + _out: syncEntryDestroy(pEntry); syncLogBufferValidate(pBuf); @@ -550,6 +583,7 @@ SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) { } int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + ASSERT(pEntry->index >= 0); SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index); @@ -563,6 +597,9 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { pEntry->term); return -1; } + + lastVer = pLogStore->syncLogLastIndex(pLogStore); + ASSERT(pEntry->index == lastVer); return 0; } @@ -607,10 +644,14 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { goto _out; } + // increase match index + pBuf->matchIndex = index; + + sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, + pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); + // replicate on demand - if (pNode->state == TAOS_SYNC_STATE_LEADER && pNode->replicaNum > 1) { - (void)syncLogBufferReplicate(pBuf, pNode, index); - } + (void)syncNodeReplicate(pNode); // persist if (syncLogStorePersist(pLogStore, pEntry) < 0) { @@ -618,16 +659,15 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { terrstr(), pEntry->index); goto _out; } - - // increment - pBuf->matchIndex = index; - matchIndex = pBuf->matchIndex; + ASSERT(pEntry->index == pBuf->matchIndex); // update my match index + matchIndex = pBuf->matchIndex; syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); } // end of while _out: + pBuf->matchIndex = matchIndex; syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return matchIndex; @@ -659,7 +699,7 @@ int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) { ASSERT(pBuf->commitIndex <= pBuf->matchIndex); ASSERT(pBuf->matchIndex < pBuf->endIndex); ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size); - for (SyncIndex index = pBuf->commitIndex; index <= pBuf->matchIndex; index++) { + for (SyncIndex index = pBuf->startIndex; index <= pBuf->matchIndex; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; ASSERT(pEntry != NULL); } @@ -694,20 +734,11 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm // execute in fsm for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { // get a log entry - if (index >= pBuf->startIndex) { - inBuf = true; - pEntry = pBuf->entries[index % pBuf->size].pItem; - } else { - inBuf = false; - if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { - sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); - ret = -1; - goto _out; - } + pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + if (pEntry == NULL) { + goto _out; } - ASSERT(pEntry != NULL); - // execute it if (!syncUtilUserCommit(pEntry->originalRpcType)) { sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, @@ -738,8 +769,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle - // TODO: with a grace period of one third of free space before commitIndex in ring buffer - SyncIndex until = pBuf->commitIndex; + SyncIndex used = pBuf->endIndex - pBuf->startIndex; + SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2; for (SyncIndex index = pBuf->startIndex; index < until; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; ASSERT(pEntry != NULL); @@ -796,9 +827,6 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { syncNodeStepDown(ths, pMsg->term); syncNodeResetElectTimer(ths); - // update commit index - (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); - if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) { sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d", ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); @@ -825,15 +853,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { // accept if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { - sWarn("vgId:%d, failed to accept raft entry into log buffer. index:%" PRId64 ", term:%" PRId64, ths->vgId, - pEntry->index, pEntry->term); goto _SEND_RESPONSE; } - pReply->success = true; _SEND_RESPONSE: - // update match index pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); + bool matched = (pReply->matchIndex >= pReply->lastSendIndex); + pReply->success = matched; + if (matched) { + // update commit index only after matching + (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); + } // ack, i.e. send response SRpcMsg rpcMsg; @@ -841,7 +871,7 @@ _SEND_RESPONSE: (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); // commit index, i.e. leader notice me - if (syncLogBufferCommit(ths->pLogBuf, ths, pMsg->commitIndex) < 0) { + if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr()); goto _out; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index e37c40455c..86d8ec11b9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -85,11 +85,11 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync } int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { - ths->commitIndex = TMAX(commitIndex, ths->commitIndex); SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - commitIndex = TMIN(ths->commitIndex, lastVer); - ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, commitIndex); - return commitIndex; + commitIndex = TMAX(commitIndex, ths->commitIndex); + ths->commitIndex = TMIN(commitIndex, lastVer); + ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); + return ths->commitIndex; } int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { @@ -102,50 +102,77 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { return ths->commitIndex; } -int32_t syncLogBufferCatchingUpReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex fromIndex, SRaftId destId) { - taosThreadMutexLock(&pBuf->mutex); +SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) { + SSyncRaftEntry* pEntry = NULL; + if (index >= pBuf->endIndex) { + return NULL; + } + if (index > pBuf->startIndex) { // startIndex might be dummy + *pInBuf = true; + pEntry = pBuf->entries[index % pBuf->size].pItem; + } else { + *pInBuf = false; + if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) { + sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + } + } + return pEntry; +} + +bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr) { + ASSERT(pMgr->startIndex <= pMgr->endIndex); + for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { + ASSERT(pMgr->states[(index + pMgr->size) % pMgr->size].barrier == false || index + 1 == pMgr->endIndex); + } + return true; +} + +static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) { + return pEntry->originalRpcType == TDMT_SYNC_NOOP; +} + +int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId, + bool* pBarrier) { + SSyncRaftEntry* pEntry = NULL; SyncAppendEntries* pMsgOut = NULL; - SyncIndex index = fromIndex; + bool inBuf = false; + int32_t ret = -1; + SyncTerm prevLogTerm = -1; + SSyncLogBuffer* pBuf = pNode->pLogBuf; - if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum <= 1) { + sInfo("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr); + + pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + if (pEntry == NULL) { + sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); goto _out; } + *pBarrier = syncLogIsReplicationBarrier(pEntry); - if (index < pBuf->startIndex) { - sError("vgId:%d, (not implemented yet) replication fromIndex: %" PRId64 - " that is less than pBuf->startIndex: %" PRId64 ". destId: 0x%016" PRId64 "", - pNode->vgId, fromIndex, pBuf->startIndex, destId.addr); + prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); + if (prevLogTerm < 0 && terrno != TSDB_CODE_SUCCESS) { + sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index); goto _out; } + (void)syncLogReplMgrUpdateTerm(pMgr, pEntry->index, pEntry->term); - if (index > pBuf->matchIndex) { + pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm); + if (pMsgOut == NULL) { + sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); goto _out; } - do { - pMsgOut = syncLogToAppendEntries(pBuf, pNode, index); - if (pMsgOut == NULL) { - sError("vgId:%d, failed to assembly append entries msg since %s. index: %" PRId64 "", pNode->vgId, terrstr(), - index); - goto _out; - } - - if (syncNodeSendAppendEntries(pNode, &destId, pMsgOut) < 0) { - sWarn("vgId:%d, failed to send append entries msg since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", - pNode->vgId, terrstr(), index, destId.addr); - goto _out; - } - - index += 1; - syncAppendEntriesDestroy(pMsgOut); - pMsgOut = NULL; - } while (false && index <= pBuf->commitIndex); + (void)syncNodeSendAppendEntries(pNode, pDestId, pMsgOut); + ret = 0; _out: syncAppendEntriesDestroy(pMsgOut); pMsgOut = NULL; - taosThreadMutexUnlock(&pBuf->mutex); - return 0; + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + return ret; } int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { @@ -185,23 +212,15 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs SyncIndex indexLikely = TMIN(pMsg->matchIndex, ths->pLogBuf->matchIndex); SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); - } else { - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - if (nextIndex > SYNC_INDEX_BEGIN) { - --nextIndex; - } - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } - // send next append entries - SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId)); - ASSERT(pState != NULL); - - if (pMsg->lastSendIndex == pState->lastSendIndex) { - syncNodeReplicateOne(ths, &(pMsg->srcId)); + // replicate log + SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); + ASSERT(pMgr != NULL); + if (pMgr != NULL) { + (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); } } - return 0; } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 8e78aeedc3..09137c31c7 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -82,6 +82,15 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, index); } +SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pDestId) { + for (int i = 0; i < pNode->replicaNum; i++) { + if (syncUtilSameId(&(pNode->replicasId[i]), pDestId)) { + return pNode->logReplMgrs[i]; + } + } + return NULL; +} + SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { if (pSyncIndexMgr == NULL) { return SYNC_INDEX_INVALID; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c3267bafdc..f9f6760e8c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1103,6 +1103,252 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { return ret; } +int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) { + ASSERT(pMgr->startIndex >= 0); + for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { + memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); + } + pMgr->startIndex = 0; + pMgr->matchIndex = 0; + pMgr->endIndex = 0; + pMgr->restored = false; + pMgr->retryBackoff = 0; + return 0; +} + +int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + if (pMgr->endIndex <= pMgr->startIndex) { + return 0; + } + + int32_t ret = -1; + bool retried = false; + int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); + + for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { + int64_t pos = index % pMgr->size; + ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex)); + if (pMgr->states[pos].acked) { + continue; + } + int64_t nowMs = taosGetMonoTimestampMs(); + if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) { + break; + } + + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, pDestId->addr); + goto _out; + } + ASSERT(barrier == pMgr->states[pos].barrier); + pMgr->states[pos].timeMs = nowMs; + pMgr->states[pos].acked = false; + retried = true; + } + + ret = 0; +_out: + if (retried) { + pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); + } + return ret; +} + +int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, + SyncAppendEntriesReply* pMsg) { + SRaftId destId = pMsg->srcId; + ASSERT(pMgr->restored == false); + + if (pMgr->endIndex == 0) { + ASSERT(pMgr->startIndex == 0); + ASSERT(pMgr->matchIndex == 0); + if (pMsg->matchIndex < 0) { + pMgr->restored = true; + return 0; + } + } else { + if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { + syncLogReplMgrRetryOnNeed(pMgr, pNode); + return 0; + } + + pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; + + if (pMsg->matchIndex == pMsg->lastSendIndex) { + pMgr->restored = true; + return 0; + } + + (void)syncLogResetLogReplMgr(pMgr); + } + + SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); + bool barrier = false; + ASSERT(index >= 0); + // send match index + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, destId.addr); + return -1; + } + + int64_t nowMs = taosGetMonoTimestampMs(); + pMgr->states[index % pMgr->size].barrier = barrier; + pMgr->states[index % pMgr->size].timeMs = nowMs; + pMgr->states[index % pMgr->size].acked = false; + + pMgr->matchIndex = index; + pMgr->startIndex = index; + pMgr->endIndex = index + 1; + return 0; +} + +int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + taosThreadMutexLock(&pBuf->mutex); + if (pMsg->startTime != pMgr->peerStartTime) { + syncLogResetLogReplMgr(pMgr); + pMgr->peerStartTime = pMsg->startTime; + } + + if (pMgr->restored) { + (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg); + } else { + (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg); + } + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + +int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + if (pMgr->restored) { + (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); + } else { + (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode); + } + return 0; +} + +int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + ASSERT(!pMgr->restored); + SyncIndex index = pNode->pLogBuf->matchIndex; + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, pDestId->addr); + return -1; + } + + SSyncLogBuffer* pBuf = pNode->pLogBuf; + sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, + pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + return 0; +} + +int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { + ASSERT(pMgr->restored); + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + int32_t batchSize = TMAX(1, pMgr->size / 10); + int32_t count = 0; + + for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { + if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) { + break; + } + if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) { + break; + } + int64_t pos = index % pMgr->size; + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + bool barrier = false; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) { + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + terrstr(), index, pDestId->addr); + return -1; + } + pMgr->states[pos].barrier = barrier; + pMgr->states[pos].timeMs = taosGetMonoTimestampMs(); + pMgr->states[pos].acked = false; + + pMgr->endIndex = index + 1; + if (barrier) { + break; + } + } + + SSyncLogBuffer* pBuf = pNode->pLogBuf; + sInfo("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 + ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + syncLogReplMgrRetryOnNeed(pMgr, pNode); + return 0; +} + +int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { + ASSERT(pMgr->restored == true); + if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { + pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; + pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex); + for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) { + memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); + } + pMgr->startIndex = pMgr->matchIndex; + } + + return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); +} + +SSyncLogReplMgr* syncLogReplMgrCreate() { + SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr)); + if (pMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]); + + ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE); + + return pMgr; + +_err: + taosMemoryFree(pMgr); + return NULL; +} + +void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { + if (pMgr == NULL) { + return; + } + (void)taosMemoryFree(pMgr); + return; +} + +int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + ASSERT(pNode->logReplMgrs[i] == NULL); + pNode->logReplMgrs[i] = syncLogReplMgrCreate(); + pNode->logReplMgrs[i]->peerId = i; + ASSERT(pNode->logReplMgrs[i] != NULL && "Out of memory."); + } + return 0; +} + +void syncNodeLogReplMgrDestroy(SSyncNode* pNode) { + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + syncLogReplMgrDestroy(pNode->logReplMgrs[i]); + pNode->logReplMgrs[i] = NULL; + } +} + SSyncLogBuffer* syncLogBufferCreate() { SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer)); if (pBuf == NULL) { @@ -1397,9 +1643,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { // is config changing pSyncNode->changing = false; + // replication mgr + syncNodeLogReplMgrInit(pSyncNode); + // peer state syncNodePeerStateInit(pSyncNode); + // // min match index pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; @@ -1532,6 +1782,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); + syncNodeLogReplMgrDestroy(pSyncNode); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); pSyncNode->pSyncRespMgr = NULL; voteGrantedDestroy(pSyncNode->pVotesGranted); @@ -2477,6 +2728,11 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { pBuf->endIndex = pBuf->matchIndex + 1; + // reset repl mgr + for (int i = 0; i < pNode->replicaNum; i++) { + SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; + syncLogResetLogReplMgr(pMgr); + } taosThreadMutexUnlock(&pBuf->mutex); return 0; } @@ -2637,8 +2893,12 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); - // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); + int32_t ret = syncNodeAppendNoop(pSyncNode); + ASSERT(ret == 0); + SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + ASSERT(lastIndex >= 0); + sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); } void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) { @@ -2671,22 +2931,33 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; + SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); - syncNodeEventLog(pSyncNode, "follower to candidate"); + // syncNodeEventLog(pSyncNode, "follower to candidate"); } void syncNodeLeader2Follower(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); syncNodeBecomeFollower(pSyncNode, "leader to follower"); - syncNodeEventLog(pSyncNode, "leader to follower"); + SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); + + // syncNodeEventLog(pSyncNode, "leader to follower"); } void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - syncNodeEventLog(pSyncNode, "candidate to follower"); + SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex); + + // syncNodeEventLog(pSyncNode, "candidate to follower"); } // raft vote -------------- @@ -3109,6 +3380,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); + sInfo("vgId:%d, append raft log index: %" PRId64 ", term: %" PRId64 " log buffer: [%" PRId64 " %" PRId64 " %" PRId64 + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + // multi replica if (ths->replicaNum > 1) { return 0; @@ -3135,7 +3411,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { return -1; } - return syncNodeAppend(ths, pEntry); + int32_t ret = syncNodeAppend(ths, pEntry); + return 0; } static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { @@ -3264,7 +3541,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt syncLogBufferValidate(pBuf); SyncIndex index = pEntry->index; - if (index - pBuf->startIndex > pBuf->size) { + if (index - pBuf->startIndex >= pBuf->size) { sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index); goto _out; } @@ -3294,30 +3571,57 @@ _out: return -1; } -SyncTerm syncLogBufferGetTerm(SSyncLogBuffer* pBuf, SyncIndex index) { - ASSERT(pBuf->startIndex <= index && index < pBuf->endIndex); - SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL); - return pEntry->term; -} +SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + SSyncRaftEntry* pEntry = NULL; + SyncIndex prevIndex = index - 1; + SyncTerm prevLogTerm = -1; + terrno = TSDB_CODE_SUCCESS; -SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) { - SyncAppendEntries* pMsg = NULL; + if (prevIndex == -1) return 0; - if (index < pBuf->startIndex || index >= pBuf->endIndex) { - sError("vgId:%d, log entry (%" PRId64 ") out of range of log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId, - index, pBuf->startIndex, pBuf->endIndex); - return pMsg; + if (index - 1 > pBuf->matchIndex) { + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; } - SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; - if (pEntry == NULL) { - sError("vgId:%d, log entry (%" PRId64 ") not exist in log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId, index, - pBuf->startIndex, pBuf->endIndex); - return pMsg; + ASSERT(index - 1 == prevIndex); + + if (index - 1 >= pBuf->startIndex) { + pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL && "no log entry found"); + prevLogTerm = pBuf->entries[(index + pBuf->size) % pBuf->size].prevLogTerm; + return prevLogTerm; + } + + if (pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { + int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; + ASSERT(timeMs != 0 && "no log entry found"); + prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; + return prevLogTerm; + } + + SSnapshot snapshot; + if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) == 0 && prevIndex == snapshot.lastApplyIndex) { + return snapshot.lastApplyTerm; } + if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) { + prevLogTerm = pEntry->term; + syncEntryDestroy(pEntry); + pEntry = NULL; + return prevLogTerm; + } + + sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex); + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; +} + +SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm) { + SyncAppendEntries* pMsg = NULL; uint32_t datalen = pEntry->bytes; + pMsg = syncAppendEntriesBuild(datalen, pNode->vgId); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -3326,8 +3630,8 @@ SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode (void)memcpy(pMsg->data, pEntry, datalen); - pMsg->prevLogIndex = index - 1; - pMsg->prevLogTerm = syncLogBufferGetTerm(pBuf, pMsg->prevLogIndex); + pMsg->prevLogIndex = pEntry->index - 1; + pMsg->prevLogTerm = prevLogTerm; pMsg->vgId = pNode->vgId; pMsg->srcId = pNode->myRaftId; pMsg->term = pNode->pRaftStore->currentTerm; @@ -3345,10 +3649,10 @@ void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) { } } -int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) { - SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode->pLogBuf, pNode, index); +int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm) { + SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm); if (pMsgOut == NULL) { - sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); + sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, pEntry->index); goto _err; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 181b9f2b74..3dcd2d8cdf 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -136,7 +136,21 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { return 0; } -int32_t syncNodeReplicate(SSyncNode* pSyncNode) { +int32_t syncNodeReplicate(SSyncNode* pNode) { + if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum == 1) { + return -1; + } + for (int32_t i = 0; i < pNode->replicaNum; i++) { + if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) { + continue; + } + SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; + (void)syncLogBufferReplicateOnce(pMgr, pNode); + } + return 0; +} + +int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { return -1; } @@ -159,6 +173,17 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { } int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) { + sInfo("vgId:%d, send append entries msg index: %" PRId64 " to dest: 0x%016" PRId64, pSyncNode->vgId, + pMsg->prevLogIndex + 1, destRaftId->addr); + int32_t ret = 0; + pMsg->destId = *destRaftId; + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return 0; +} + +int32_t syncNodeSendAppendEntriesOld(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) { int32_t ret = 0; pMsg->destId = *destRaftId; diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index 1cd1903851..2007bc474f 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -23,6 +23,9 @@ static SMsgCb defaultMsgCb; void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; } int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { + if (msgcb == NULL) { + return -1; + } int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); -- GitLab