提交 af27303a 编写于 作者: M Minghao Li

enh(sync): update raft core functions

上级 01b04e03
......@@ -40,6 +40,7 @@ extern "C" {
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode);
int32_t syncNodeElect(SSyncNode* pSyncNode);
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
......@@ -225,6 +225,8 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
......@@ -433,71 +433,133 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return ret;
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
return true;
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
return true;
return false;
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry) {
int32_t code;
*ppAppendEntry = NULL;
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, extraIndex, &pExtraEntry);
ASSERT(pExtraEntry != NULL);
*ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(*ppAppendEntry != NULL);
// log not match, conflict, need delete
ASSERT(extraIndex == (*ppAppendEntry)->index);
if (pExtraEntry->term != (*ppAppendEntry)->term) {
conflict = true;
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("entry conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
ASSERT(code == 0);
ASSERT(pRollBackEntry != NULL);
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
cbMeta.index = pRollBackEntry->index;
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, extraIndex);
ASSERT(code == 0);
return 0;
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
return 0;
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
int32_t code = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesSnapshotCb== term:%lu", ths->pRaftStore->currentTerm);
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
assert(pMsg->term <= ths->pRaftStore->currentTerm);
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId;
assert(pMsg->dataLen >= 0);
SyncIndex localPreLogIndex;
SyncTerm localPreLogTerm;
bool logOK;
SyncIndex logFirstIndex = logStoreFirstIndex(ths->pLogStore);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (logFirstIndex > snapshot.lastApplyIndex) {
logOK = false;
} else if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
ASSERT(pMsg->dataLen >= 0);
// maybe this assert is error, because replica take takesnapshot separately
// leader will reset next index to newest
ASSERT(pMsg->prevLogIndex == snapshot.lastApplyIndex);
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
logOK = (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->prevLogTerm == snapshot.lastApplyTerm);
"1 - logOK:%d, pMsg->prevLogIndex:%ld, snapshot.lastApplyIndex:%ld, pMsg->prevLogTerm:%lu, "
logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm);
} else {
ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm);
ASSERT(ret == 0);
logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) &&
(pMsg->prevLogTerm == localPreLogTerm));
sTrace("2 - logOK:%d, pMsg->prevLogIndex:%ld, getLastIndex:%ld, pMsg->prevLogTerm:%lu, localPreLogTerm:%lu", logOK,
pMsg->prevLogIndex, ths->pLogStore->getLastIndex(ths->pLogStore), pMsg->prevLogTerm, localPreLogTerm);
// reject request
// case1, reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
"syncNodeOnAppendEntriesSnapshotCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, "
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
sTrace("recv SyncAppendEntries, reject, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", pMsg->term,
ths->pRaftStore->currentTerm, ths->state, logOK);
// send response
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
......@@ -513,26 +575,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
SyncIndex localLastIndex;
SyncTerm localLastTerm;
if (logFirstIndex == SYNC_INDEX_INVALID) {
localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore);
localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore);
} else if (logFirstIndex > snapshot.lastApplyIndex) {
localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore);
localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore);
} else {
ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm);
ASSERT(ret == 0);
// return to follower state
// case 2, return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
"syncNodeOnAppendEntriesSnapshotCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
sTrace("recv SyncAppendEntries, return to follower, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
......@@ -541,99 +586,35 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
// accept request
// case 3, accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// preIndex = -1, or has preIndex entry in local log
assert(pMsg->prevLogIndex <= localLastIndex);
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < localLastIndex;
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex;
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
"syncNodeOnAppendEntriesSnapshotCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
"recv SyncAppendEntries, accept, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, "
"hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// log not match, conflict
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) {
conflict = true;
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesSnapshotCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin,
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
cbMeta.index = pRollBackEntry->index;
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// make log same
SSyncRaftEntry* pAppendEntry;
code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry);
ASSERT(code == 0);
ASSERT(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
// free memory
} else if (hasExtraEntries && !hasAppendEntries) {
......@@ -641,38 +622,26 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
ASSERT(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
// free memory
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
......@@ -685,6 +654,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = pMsg->prevLogIndex;
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
......@@ -693,7 +663,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
......@@ -701,122 +671,11 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
ths->commitIndex = pMsg->commitIndex;
// call back Wal
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
// notice! wal maybe deleted, update firstVer
SyncIndex walFirstVer = walGetFirstVer(ths->pWal);
if (i != SYNC_INDEX_INVALID && i >= walFirstVer) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0x11;
SSnapshot snapshot;
ASSERT(ths->pFsm->FpGetSnapshot != NULL);
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
bool needExecute = true;
if (cbMeta.index <= snapshot.lastApplyIndex) {
needExecute = false;
if (needExecute) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool hit = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
hit = true;
SReConfigCbMeta cbMeta = {0};
bool isDrop;
// I am in newConfig
if (hit) {
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
} else {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
// always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) {
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesSnapshotCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
sInfo("==syncNodeOnAppendEntriesSnapshotCb== RestoreFinish tsem_post %p", ths);
code = syncNodeCommit(ths, beginIndex, endIndex, 0x11);
ASSERT(code == 0);
......@@ -98,18 +98,20 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplySnapshotCb== term:%lu", ths->pRaftStore->currentTerm);
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesReplyLog2(logBuf, pMsg);
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
return ret;
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== before pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== before pMatchIndex", ths->pMatchIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply before pMatchIndex:", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
......@@ -118,14 +120,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplySnapshotCb error term, receive:%lu current:%lu",
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu",
pMsg->term, ths->pRaftStore->currentTerm);
syncNodeLog2(logBuf, ths);
sError("%s", logBuf);
return ret;
assert(pMsg->term == ths->pRaftStore->currentTerm);
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
......@@ -143,14 +145,25 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) {
// has snapshot
if (syncNodeHasSnapshot(ths)) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (nextIndex <= snapshot.lastApplyIndex) {
nextIndex = snapshot.lastApplyIndex + 1;
sInfo("reset new nextIndex %ld, snapshot.lastApplyIndex:%ld", nextIndex, snapshot.lastApplyIndex);
} else {
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== after pMatchIndex", ths->pMatchIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
return ret;
\ No newline at end of file
......@@ -94,6 +94,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// execute fsm
if (pSyncNode->pFsm != NULL) {
int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, 0x1);
ASSERT(code == 0);
#if 0
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
......@@ -201,6 +205,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
......@@ -51,7 +51,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
......@@ -1704,3 +1704,115 @@ const char* syncStr(ESyncState state) {
return "ERROR";
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
ASSERT(code == 0);
ASSERT(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = flag;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool hit = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
hit = true;
SReConfigCbMeta cbMeta = {0};
bool isDrop;
// I am in newConfig
if (hit) {
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
} else {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
// always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) {
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
// restore finish
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->restoreFinish = true;
sInfo("restore finish %p vgId:%d", ths, ths->vgId);
return 0;
\ No newline at end of file
......@@ -117,91 +117,36 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pMatchIndex", pSyncNode->pMatchIndex);
logStoreSimpleLog2("==syncNodeAppendEntriesPeersSnapshot==", pSyncNode->pLogStore);
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
// next index
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex;
SyncTerm preLogTerm;
// pre index, pre term
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
// sending snapshot finish?
bool snapshotSendingFinish = false;
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(pDestId, &((pSyncNode->replicasId)[i]))) {
pSender = (pSyncNode->senders)[i];
ASSERT(pSender != NULL);
snapshotSendingFinish = (pSender->finish) && (pSender->term == pSyncNode->pRaftStore->currentTerm);
if (snapshotSendingFinish) {
sInfo("snapshotSendingFinish! term:%lu", pSender->term);
if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) ||
syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
// will send this msg until snapshot receive finish!
SSnapshot snapshot = pSender->snapshot;
sInfo("nextIndex:%ld in snapshot: <lastApplyIndex:%ld, lastApplyTerm:%lu>, begin snapshot", nextIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
// do not use next index
// always send from snapshot.lastApplyIndex + 1, and wait for snapshot transfer finish
preLogIndex = snapshot.lastApplyIndex;
preLogTerm = snapshot.lastApplyTerm;
// update next index!
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, snapshot.lastApplyIndex + 1);
// to claim leader
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
assert(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg);
// send AppendEntries
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
if (!snapshotSendingFinish) {
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(&((pSyncNode->replicasId)[i]), pDestId)) {
pSender = (pSyncNode->senders)[i];
ASSERT(pSender != NULL);
// prepare entry
SyncAppendEntries* pMsg = NULL;
} else {
ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm);
ASSERT(ret == 0);
SSyncRaftEntry* pEntry;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0);
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) {
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
assert(pMsg != NULL);
// add pEntry into msg
uint32_t len;
......@@ -213,12 +158,13 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
} else {
// maybe overflow, send empty record
// no entry in log
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
assert(pMsg != NULL);
assert(pMsg != NULL);
// prepare msg
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
......@@ -226,13 +172,10 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg);
// send AppendEntries
// send msg
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
return ret;
......@@ -81,24 +81,35 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return ret;
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->lastLogTerm > myLastTerm) {
return true;
if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
return true;
return false;
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteSnapshotCb== term:%lu", ths->pRaftStore->currentTerm);
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVote, currentTerm:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteLog2(logBuf, pMsg);
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
assert(pMsg->term <= ths->pRaftStore->currentTerm);
SyncIndex lastIndex;
SyncTerm lastTerm;
ret = syncNodeGetLastIndexTerm(ths, &lastIndex, &lastTerm);
ASSERT(ret == 0);
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > lastTerm) || ((pMsg->lastLogTerm == lastTerm) && (pMsg->lastLogIndex >= lastIndex));
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
......@@ -110,6 +121,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// send msg
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
......@@ -96,12 +96,14 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplySnapshotCb== term:%lu", ths->pRaftStore->currentTerm);
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
return ret;
......@@ -114,14 +116,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplySnapshotCb error term, receive:%lu current:%lu",
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, error term, receive_term:%lu current_term:%lu",
pMsg->term, ths->pRaftStore->currentTerm);
syncNodePrint2(logBuf, ths);
sError("%s", logBuf);
return ret;
assert(pMsg->term == ths->pRaftStore->currentTerm);
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
......@@ -90,7 +90,8 @@ void test1() {
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex,
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("syncStartIndex: %ld", syncStartIndex);
......@@ -143,7 +144,8 @@ void test2() {
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex,
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("syncStartIndex: %ld", syncStartIndex);
......@@ -158,7 +160,6 @@ void test2() {
void test3() {
......@@ -189,7 +190,8 @@ void test3() {
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex,
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("syncStartIndex: %ld", syncStartIndex);
......@@ -242,7 +244,8 @@ void test4() {
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex,
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("syncStartIndex: %ld", syncStartIndex);
......@@ -302,7 +305,8 @@ void test5() {
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex,
sTrace("lastIndex: %ld", lastIndex);
sTrace("lastTerm: %lu", lastTerm);
sTrace("syncStartIndex: %ld", syncStartIndex);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册