提交 fa30b94c 编写于 作者: M Minghao Li

refactor(sync): append entries

上级 797d1324
......@@ -37,6 +37,8 @@ extern bool gRaftDetailLog;
......@@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply {
SyncTerm privateTerm;
bool success;
SyncIndex matchIndex;
SyncIndex lastSendIndex;
int64_t startTime;
} SyncAppendEntriesReply;
......@@ -698,6 +699,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
......@@ -407,7 +407,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
......@@ -83,6 +83,11 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
typedef struct SPeerState {
SyncIndex lastSendIndex;
int64_t lastSendTime;
} SPeerState;
typedef struct SSyncNode {
// init by SSyncInfo
SyncGroupId vgId;
......@@ -186,6 +191,8 @@ typedef struct SSyncNode {
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* pNewNodeReceiver;
SPeerState peerStates[TSDB_MAX_REPLICA];
// is config changing
bool changing;
......@@ -283,6 +290,8 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId);
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId);
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
......@@ -300,6 +309,8 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
bool syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
// trace log
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
......@@ -61,9 +61,16 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg);
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
int32_t syncNodeDoReplicate(SSyncNode* pSyncNode);
int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
#ifdef __cplusplus
......@@ -1042,4 +1042,123 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { return 0; }
\ No newline at end of file
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) {
if (pMsg->term > ths->pRaftStore->currentTerm) {
pReply->term = pMsg->term;
syncNodeStepDown(ths, pMsg->term);
SyncIndex startIndex = ths->pLogStore->syncLogBeginIndex(ths->pLogStore);
SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
if (pMsg->prevLogIndex > lastIndex) {
if (pMsg->prevLogIndex >= startIndex) {
SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1);
if (myPreLogTerm != pMsg->prevLogTerm) {
// accept
pReply->success = true;
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(pAppendEntry != NULL);
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pLocalEntry = NULL;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
ASSERT(code == 0);
if (pLocalEntry->term == pAppendEntry->term) {
// do nothing
} else {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
ASSERT(code == 0);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// update match index
pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
// maybe update commit index, leader notice me
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin,
syncNodeEventLog(ths, eventLog);
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
return 0;
// msg event log
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
return 0;
\ No newline at end of file
......@@ -20,6 +20,7 @@
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
......@@ -415,4 +416,54 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
return 0;
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { return 0; }
\ No newline at end of file
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
return -1;
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
if (nextIndex > SYNC_INDEX_BEGIN) {
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
// send next append entries
SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId));
ASSERT(pState != NULL);
if (pMsg->lastSendIndex == pState->lastSendIndex) {
syncNodeDoAppendEntries(ths, &(pMsg->srcId));
return 0;
\ No newline at end of file
......@@ -1219,6 +1219,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// is config changing
pSyncNode->changing = false;
// peer state
// start in syncNodeStart
// start raft
// syncNodeBecomeFollower(pSyncNode);
......@@ -2331,6 +2334,32 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
pSyncNode->peerStates[i].lastSendTime = 0;
return 0;
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm);
if (pSyncNode->pRaftStore->currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
} else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(pSyncNode, "step down");
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
......@@ -2924,6 +2953,55 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
return ret;
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
int32_t ret = 0;
int32_t code = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
ASSERT(pEntry != NULL);
LRUHandle* h = NULL;
syncCacheEntry(ths->pLogStore, pEntry, &h);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
// append entry
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
// del resp mgr, call FpCommitCb
return -1;
// if mulit replica, start replicate right now
if (ths->replicaNum > 1) {
// if only myself, maybe commit right now
if (ths->replicaNum == 1) {
if (pRetIndex != NULL) {
if (ret == 0 && pEntry != NULL) {
*pRetIndex = pEntry->index;
} else {
if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
return ret;
int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) {
int32_t code = 0;
......@@ -3331,6 +3409,30 @@ SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
return pTimer;
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
SPeerState* pState = NULL;
for (int i = 0; i < ths->replicaNum; ++i) {
if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
pState = &((ths->peerStates)[i]);
return pState;
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
ASSERT(pState != NULL);
SyncIndex sendIndex = pMsg->prevLogIndex + 1;
int64_t tsNow = taosGetTimestampMs();
if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
return false;
return true;
bool syncNodeCanChange(SSyncNode* pSyncNode) {
if (pSyncNode->changing) {
sError("sync cannot change");
......@@ -478,6 +478,118 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer) {
return ret;
int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) {
// next index
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
// maybe start snapshot
SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
if (nextIndex < logStartIndex || nextIndex > logEndIndex) {
// start snapshot
return 0;
// pre index, pre term
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
// prepare entry
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
if (code == 0) {
ASSERT(pEntry != NULL);
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
// add pEntry into msg
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
ASSERT(len == pEntry->bytes);
memcpy(pMsg->data, serialized, len);
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
// no entry in log
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
} else {
syncNodeLog3("", pSyncNode);
// prepare msg
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
pMsg->privateTerm = 0;
// pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);
// send msg
syncNodeMaybeSendAppendEntries(pSyncNode, pDestId, pMsg);
return 0;
int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
return -1;
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
ret = syncNodeDoAppendEntries(pSyncNode, pDestId);
if (ret != 0) {
char host[64];
int16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sError("vgId:%d, do append entries error for %s:%d", pSyncNode->vgId, host, port);
return 0;
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
syncLogSendAppendEntries(pSyncNode, pMsg, "");
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
ASSERT(pState != NULL);
pState->lastSendIndex = pMsg->prevLogIndex + 1;
pState->lastSendTime = taosGetTimestampMs();
return ret;
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg);
return ret;
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
syncLogSendAppendEntries(pSyncNode, pMsg, "");
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册