From fa30b94c67ed7fa1ddc2f916b6c022e11b3e127b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 16 Oct 2022 12:07:02 +0800 Subject: [PATCH] refactor(sync): append entries --- include/libs/sync/sync.h | 2 + include/libs/sync/syncTools.h | 2 + source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/sync/inc/syncInt.h | 13 +- source/libs/sync/inc/syncReplication.h | 7 + source/libs/sync/src/syncAppendEntries.c | 121 +++++++++++++++++- source/libs/sync/src/syncAppendEntriesReply.c | 53 +++++++- source/libs/sync/src/syncMain.c | 102 +++++++++++++++ source/libs/sync/src/syncReplication.c | 112 ++++++++++++++++ 9 files changed, 410 insertions(+), 4 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 3011350aa4..c38a41983a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -37,6 +37,8 @@ extern bool gRaftDetailLog; #define SYNC_MAX_RECV_TIME_RANGE_MS 1200 #define SYNC_ADD_QUORUM_COUNT 3 +#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 + #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 48ce6c3d10..bd49384af0 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply { SyncTerm privateTerm; bool success; SyncIndex matchIndex; + SyncIndex lastSendIndex; int64_t startTime; } SyncAppendEntriesReply; @@ -698,6 +699,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); +int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 746e0959a8..061cd9cd3a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -407,7 +407,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); syncClientRequestDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 4ca2985ebb..c670185533 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -83,6 +83,11 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); +typedef struct SPeerState { + SyncIndex lastSendIndex; + int64_t lastSendTime; +} SPeerState; + typedef struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; @@ -186,6 +191,8 @@ typedef struct SSyncNode { SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* pNewNodeReceiver; + SPeerState peerStates[TSDB_MAX_REPLICA]; + // is config changing bool changing; @@ -283,6 +290,8 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId); +SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId); +bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); @@ -299,7 +308,9 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); -bool syncNodeIsMnode(SSyncNode* pSyncNode); +bool syncNodeIsMnode(SSyncNode* pSyncNode); +int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); +void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); // trace log void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 58083997f9..d0f273dc9c 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -61,9 +61,16 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); +//--------------------------------------------- + int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg); int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); +int32_t syncNodeDoReplicate(SSyncNode* pSyncNode); +int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId); +int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index b8bb6e5da7..da59ab9dd9 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -1042,4 +1042,123 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } -int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { return 0; } \ No newline at end of file +int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = false; + pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + pReply->lastSendIndex = pMsg->prevLogIndex + 1; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->startTime = ths->startTime; + + if (pMsg->term < ths->pRaftStore->currentTerm) { + goto _SEND_RESPONSE; + } + + if (pMsg->term > ths->pRaftStore->currentTerm) { + pReply->term = pMsg->term; + goto _SEND_RESPONSE; + } + + syncNodeStepDown(ths, pMsg->term); + syncNodeResetElectTimer(ths); + + SyncIndex startIndex = ths->pLogStore->syncLogBeginIndex(ths->pLogStore); + SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + + if (pMsg->prevLogIndex > lastIndex) { + goto _SEND_RESPONSE; + } + + if (pMsg->prevLogIndex >= startIndex) { + SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1); + ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + + if (myPreLogTerm != pMsg->prevLogTerm) { + goto _SEND_RESPONSE; + } + } + + // accept + pReply->success = true; + bool hasAppendEntries = pMsg->dataLen > 0; + if (hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ASSERT(pAppendEntry != NULL); + + SyncIndex appendIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pLocalEntry = NULL; + int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry); + ASSERT(code == 0); + + if (pLocalEntry->term == pAppendEntry->term) { + // do nothing + } else { + code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); + ASSERT(code == 0); + + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); + } + + syncEntryDestory(pLocalEntry); + syncEntryDestory(pAppendEntry); + } + + // update match index + pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + + // maybe update commit index, leader notice me + if (pMsg->commitIndex > ths->commitIndex) { + // has commit entry in local + if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + // advance commit index to sanpshot first + SSnapshot snapshot; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { + SyncIndex commitBegin = ths->commitIndex; + SyncIndex commitEnd = snapshot.lastApplyIndex; + ths->commitIndex = snapshot.lastApplyIndex; + + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, + commitEnd); + syncNodeEventLog(ths, eventLog); + } + + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = pMsg->commitIndex; + + // update commit index + ths->commitIndex = pMsg->commitIndex; + + // call back Wal + int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + ASSERT(code == 0); + + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); + } + } + + goto _SEND_RESPONSE; + +_IGNORE: + syncAppendEntriesReplyDestroy(pReply); + return 0; + +_SEND_RESPONSE: + // msg event log + syncLogSendAppendEntriesReply(ths, pReply, ""); + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 97d8fdd626..2f56c142ab 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -20,6 +20,7 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncReplication.h" #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" @@ -415,4 +416,54 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries return 0; } -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { return 0; } \ No newline at end of file +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; + + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { + syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); + return -1; + } + + // drop stale response + if (pMsg->term < ths->pRaftStore->currentTerm) { + syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); + return 0; + } + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); + syncNodeStepDown(ths, pMsg->term); + return -1; + } + + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + + if (pMsg->success) { + SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + if (pMsg->matchIndex > oldMatchIndex) { + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + syncMaybeAdvanceCommitIndex(ths); + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + + } else { + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + if (nextIndex > SYNC_INDEX_BEGIN) { + --nextIndex; + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + } + + // send next append entries + SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId)); + ASSERT(pState != NULL); + + if (pMsg->lastSendIndex == pState->lastSendIndex) { + syncNodeDoAppendEntries(ths, &(pMsg->srcId)); + } + } + + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3732dce70a..12aedbe5d1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1219,6 +1219,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // is config changing pSyncNode->changing = false; + // peer state + syncNodePeerStateInit(pSyncNode); + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -2331,6 +2334,32 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } +int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID; + pSyncNode->peerStates[i].lastSendTime = 0; + } + + return 0; +} + +void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { + ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm); + + if (pSyncNode->pRaftStore->currentTerm < newTerm) { + raftStoreSetTerm(pSyncNode->pRaftStore, newTerm); + char tmpBuf[64]; + snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm); + syncNodeBecomeFollower(pSyncNode, tmpBuf); + raftStoreClearVote(pSyncNode->pRaftStore); + + } else { + if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeBecomeFollower(pSyncNode, "step down"); + } + } +} + void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; @@ -2924,6 +2953,55 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI return ret; } +int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { + int32_t ret = 0; + int32_t code = 0; + + SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + ASSERT(pEntry != NULL); + + LRUHandle* h = NULL; + syncCacheEntry(ths->pLogStore, pEntry, &h); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + // append entry + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + if (code != 0) { + // del resp mgr, call FpCommitCb + ASSERT(0); + return -1; + } + + // if mulit replica, start replicate right now + if (ths->replicaNum > 1) { + syncNodeDoReplicate(ths); + } + + // if only myself, maybe commit right now + if (ths->replicaNum == 1) { + syncMaybeAdvanceCommitIndex(ths); + } + } + + if (pRetIndex != NULL) { + if (ret == 0 && pEntry != NULL) { + *pRetIndex = pEntry->index; + } else { + *pRetIndex = SYNC_INDEX_INVALID; + } + } + + if (h) { + taosLRUCacheRelease(ths->pLogStore->pCache, h, false); + } else { + syncEntryDestory(pEntry); + } + + return ret; +} + int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) { int32_t code = 0; @@ -3331,6 +3409,30 @@ SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) { return pTimer; } +SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) { + SPeerState* pState = NULL; + for (int i = 0; i < ths->replicaNum; ++i) { + if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) { + pState = &((ths->peerStates)[i]); + } + } + return pState; +} + +bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) { + SPeerState* pState = syncNodeGetPeerState(ths, pDestId); + ASSERT(pState != NULL); + + SyncIndex sendIndex = pMsg->prevLogIndex + 1; + int64_t tsNow = taosGetTimestampMs(); + + if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) { + return false; + } + + return true; +} + bool syncNodeCanChange(SSyncNode* pSyncNode) { if (pSyncNode->changing) { sError("sync cannot change"); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 8acdb5cea1..fc293b9149 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -478,6 +478,118 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer) { return ret; } +int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { + // next index + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + + // maybe start snapshot + SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); + SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); + if (nextIndex < logStartIndex || nextIndex > logEndIndex) { + // start snapshot + return 0; + } + + // pre index, pre term + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); + + // prepare entry + SyncAppendEntries* pMsg = NULL; + + SSyncRaftEntry* pEntry; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); + + if (code == 0) { + ASSERT(pEntry != NULL); + + pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); + ASSERT(pMsg != NULL); + + // add pEntry into msg + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + ASSERT(len == pEntry->bytes); + memcpy(pMsg->data, serialized, len); + + taosMemoryFree(serialized); + syncEntryDestory(pEntry); + + } else { + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + // no entry in log + pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); + ASSERT(pMsg != NULL); + + } else { + syncNodeLog3("", pSyncNode); + ASSERT(0); + } + } + + // prepare msg + ASSERT(pMsg != NULL); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->privateTerm = 0; + // pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId); + + // send msg + syncNodeMaybeSendAppendEntries(pSyncNode, pDestId, pMsg); + syncAppendEntriesDestroy(pMsg); + + return 0; +} + +int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + return -1; + } + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId* pDestId = &(pSyncNode->peersId[i]); + ret = syncNodeDoAppendEntries(pSyncNode, pDestId); + if (ret != 0) { + char host[64]; + int16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + sError("vgId:%d, do append entries error for %s:%d", pSyncNode->vgId, host, port); + } + } + + return 0; +} + +int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { + int32_t ret = 0; + syncLogSendAppendEntries(pSyncNode, pMsg, ""); + + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + + SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); + ASSERT(pState != NULL); + + pState->lastSendIndex = pMsg->prevLogIndex + 1; + pState->lastSendTime = taosGetTimestampMs(); + + return ret; +} + +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { + int32_t ret = 0; + if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { + ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg); + } + return ret; +} + int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { int32_t ret = 0; syncLogSendAppendEntries(pSyncNode, pMsg, ""); -- GitLab