From af27303a4c0a8901b66781988ad56c4f0b7e66a1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 6 Jun 2022 16:02:25 +0800 Subject: [PATCH] enh(sync): update raft core functions --- source/libs/sync/inc/syncElection.h | 1 + source/libs/sync/inc/syncInt.h | 2 + source/libs/sync/src/syncAppendEntries.c | 419 ++++++------------ source/libs/sync/src/syncAppendEntriesReply.c | 29 +- source/libs/sync/src/syncCommit.c | 5 + source/libs/sync/src/syncElection.c | 2 +- source/libs/sync/src/syncMain.c | 112 +++++ source/libs/sync/src/syncReplication.c | 137 ++---- source/libs/sync/src/syncRequestVote.c | 28 +- source/libs/sync/src/syncRequestVoteReply.c | 10 +- source/libs/sync/test/syncRaftLogTest3.cpp | 26 +- 11 files changed, 362 insertions(+), 409 deletions(-) diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7a3d848d46..128dbf4050 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -40,6 +40,7 @@ extern "C" { // int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode); + int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 352649f16f..3f269625e1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -225,6 +225,8 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index); SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index); int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm); +int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index fd31bfc148..0683e1d975 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -433,71 +433,133 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { return ret; } +static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { + if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { + return true; + } + + SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); + SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); + + if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { + return true; + } + + return false; +} + +static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry) { + int32_t code; + *ppAppendEntry = NULL; + + // not conflict by default + bool conflict = false; + + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry; + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, extraIndex, &pExtraEntry); + ASSERT(pExtraEntry != NULL); + + *ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ASSERT(*ppAppendEntry != NULL); + + // log not match, conflict, need delete + ASSERT(extraIndex == (*ppAppendEntry)->index); + if (pExtraEntry->term != (*ppAppendEntry)->term) { + conflict = true; + } + syncEntryDestory(pExtraEntry); + + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; + + sTrace("entry conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); + + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry; + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry); + ASSERT(code == 0); + ASSERT(pRollBackEntry != NULL); + + if (syncUtilUserRollback(pRollBackEntry->msgType)) { + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + + SFsmCbMeta cbMeta; + cbMeta.index = pRollBackEntry->index; + cbMeta.isWeak = pRollBackEntry->isWeak; + cbMeta.code = 0; + cbMeta.state = ths->state; + cbMeta.seqNum = pRollBackEntry->seqNum; + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); + rpcFreeCont(rpcMsg.pCont); + } + + syncEntryDestory(pRollBackEntry); + } + } + + // delete confict entries + code = ths->pLogStore->syncLogTruncate(ths->pLogStore, extraIndex); + ASSERT(code == 0); + } + + return 0; +} + +static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { + SFsmCbMeta cbMeta; + cbMeta.index = pEntry->index; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.code = 2; + cbMeta.state = ths->state; + cbMeta.seqNum = pEntry->seqNum; + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } + } + rpcFreeCont(rpcMsg.pCont); + return 0; +} + int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; + int32_t code = 0; + // print log char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesSnapshotCb== term:%lu", ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm); syncAppendEntriesLog2(logBuf, pMsg); + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); } - assert(pMsg->term <= ths->pRaftStore->currentTerm); + ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); // reset elect timer if (pMsg->term == ths->pRaftStore->currentTerm) { ths->leaderCache = pMsg->srcId; syncNodeResetElectTimer(ths); } - assert(pMsg->dataLen >= 0); - - SyncIndex localPreLogIndex; - SyncTerm localPreLogTerm; - - bool logOK; - - SyncIndex logFirstIndex = logStoreFirstIndex(ths->pLogStore); - SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - if (logFirstIndex > snapshot.lastApplyIndex) { - logOK = false; + ASSERT(pMsg->dataLen >= 0); - } else if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) { - SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - - // maybe this assert is error, because replica take takesnapshot separately - // leader will reset next index to newest - ASSERT(pMsg->prevLogIndex == snapshot.lastApplyIndex); - - logOK = (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->prevLogTerm == snapshot.lastApplyTerm); - sTrace( - "1 - logOK:%d, pMsg->prevLogIndex:%ld, snapshot.lastApplyIndex:%ld, pMsg->prevLogTerm:%lu, " - "snapshot.lastApplyTerm:%lu", - logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm); - - } else { - ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm); - ASSERT(ret == 0); - - logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || - ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && - (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && - (pMsg->prevLogTerm == localPreLogTerm)); - sTrace("2 - logOK:%d, pMsg->prevLogIndex:%ld, getLastIndex:%ld, pMsg->prevLogTerm:%lu, localPreLogTerm:%lu", logOK, - pMsg->prevLogIndex, ths->pLogStore->getLastIndex(ths->pLogStore), pMsg->prevLogTerm, localPreLogTerm); - } + bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); - // reject request + // case1, reject request if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { - sTrace( - "syncNodeOnAppendEntriesSnapshotCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "ths->state:%d, " - "logOK:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + sTrace("recv SyncAppendEntries, reject, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", pMsg->term, + ths->pRaftStore->currentTerm, ths->state, logOK); + // send response SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; @@ -513,27 +575,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } - SyncIndex localLastIndex; - SyncTerm localLastTerm; - if (logFirstIndex == SYNC_INDEX_INVALID) { - localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore); - localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore); - - } else if (logFirstIndex > snapshot.lastApplyIndex) { - localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore); - localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore); - - } else { - ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm); - ASSERT(ret == 0); - } - - // return to follower state + // case 2, return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { - sTrace( - "syncNodeOnAppendEntriesSnapshotCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "ths->state:%d, logOK:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + sTrace("recv SyncAppendEntries, return to follower, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); syncNodeBecomeFollower(ths); @@ -541,99 +586,35 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } - // accept request + // case 3, accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - // preIndex = -1, or has preIndex entry in local log - assert(pMsg->prevLogIndex <= localLastIndex); - // has extra entries (> preIndex) in local log - bool hasExtraEntries = pMsg->prevLogIndex < localLastIndex; + SyncIndex myLastIndex = syncNodeGetLastIndex(ths); + bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; sTrace( - "syncNodeOnAppendEntriesSnapshotCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "ths->state:%d, " - "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d", + "recv SyncAppendEntries, accept, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, " + "hasExtraEntries:%d, hasAppendEntries:%d", pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); if (hasExtraEntries && hasAppendEntries) { - // not conflict by default - bool conflict = false; - - SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); - assert(pExtraEntry != NULL); - - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + // make log same + SSyncRaftEntry* pAppendEntry; + code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry); + ASSERT(code == 0); + ASSERT(pAppendEntry != NULL); - // log not match, conflict - assert(extraIndex == pAppendEntry->index); - if (pExtraEntry->term != pAppendEntry->term) { - conflict = true; - } - - if (conflict) { - // roll back - SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); - SyncIndex delEnd = extraIndex; - - sTrace("syncNodeOnAppendEntriesSnapshotCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, - delEnd); - - // notice! reverse roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); - assert(pRollBackEntry != NULL); - - // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { - if (syncUtilUserRollback(pRollBackEntry->msgType)) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - - SFsmCbMeta cbMeta; - cbMeta.index = pRollBackEntry->index; - cbMeta.isWeak = pRollBackEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pRollBackEntry->seqNum; - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); - rpcFreeCont(rpcMsg.pCont); - } - - syncEntryDestory(pRollBackEntry); - } - } - - // delete confict entries - ths->pLogStore->truncate(ths->pLogStore, extraIndex); - - // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + // append new entries + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); - // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta; - cbMeta.index = pAppendEntry->index; - cbMeta.isWeak = pAppendEntry->isWeak; - cbMeta.code = 2; - cbMeta.state = ths->state; - cbMeta.seqNum = pAppendEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); - } + // pre commit + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); - // free memory - syncEntryDestory(pExtraEntry); syncEntryDestory(pAppendEntry); } else if (hasExtraEntries && !hasAppendEntries) { @@ -641,38 +622,26 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } else if (!hasExtraEntries && hasAppendEntries) { SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + ASSERT(pAppendEntry != NULL); // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta; - cbMeta.index = pAppendEntry->index; - cbMeta.isWeak = pAppendEntry->isWeak; - cbMeta.code = 3; - cbMeta.state = ths->state; - cbMeta.seqNum = pAppendEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); - // free memory syncEntryDestory(pAppendEntry); } else if (!hasExtraEntries && !hasAppendEntries) { // do nothing } else { - assert(0); + ASSERT(0); } + // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; @@ -685,6 +654,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->matchIndex = pMsg->prevLogIndex; } + // send response SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); @@ -693,7 +663,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { SyncIndex beginIndex = ths->commitIndex + 1; SyncIndex endIndex = pMsg->commitIndex; @@ -701,122 +671,11 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ths->commitIndex = pMsg->commitIndex; // call back Wal - ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - - // execute fsm - if (ths->pFsm != NULL) { - for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - // notice! wal maybe deleted, update firstVer - SyncIndex walFirstVer = walGetFirstVer(ths->pWal); - - if (i != SYNC_INDEX_INVALID && i >= walFirstVer) { - SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i); - assert(pEntry != NULL); - - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + ASSERT(code == 0); - if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta; - cbMeta.index = pEntry->index; - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.flag = 0x11; - - SSnapshot snapshot; - ASSERT(ths->pFsm->FpGetSnapshot != NULL); - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - - bool needExecute = true; - if (cbMeta.index <= snapshot.lastApplyIndex) { - needExecute = false; - } - - if (needExecute) { - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - - // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { - SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; - - SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); - ASSERT(ret == 0); - - // update new config myIndex - bool hit = false; - for (int i = 0; i < newSyncCfg.replicaNum; ++i) { - if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && - ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { - newSyncCfg.myIndex = i; - hit = true; - break; - } - } - - SReConfigCbMeta cbMeta = {0}; - bool isDrop; - - // I am in newConfig - if (hit) { - syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop); - - // change isStandBy to normal - if (!isDrop) { - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); - } else { - syncNodeBecomeFollower(ths); - } - } - - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); - taosMemoryFree(sOld); - taosMemoryFree(sNew); - } - - // always call FpReConfigCb - if (ths->pFsm->FpReConfigCb != NULL) { - cbMeta.code = 0; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.index = pEntry->index; - cbMeta.term = pEntry->term; - cbMeta.oldCfg = oldSyncCfg; - cbMeta.flag = 0x11; - cbMeta.isDrop = isDrop; - ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); - } - } - - // restore finish - if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { - if (ths->restoreFinish == false) { - if (ths->pFsm->FpRestoreFinishCb != NULL) { - ths->pFsm->FpRestoreFinishCb(ths->pFsm); - } - ths->restoreFinish = true; - sInfo("==syncNodeOnAppendEntriesSnapshotCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); - - /* - tsem_post(&ths->restoreSem); - sInfo("==syncNodeOnAppendEntriesSnapshotCb== RestoreFinish tsem_post %p", ths); - */ - } - } - - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); - } - } - } + code = syncNodeCommit(ths, beginIndex, endIndex, 0x11); + ASSERT(code == 0); } } } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 8ba169d093..b4520f55a9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -98,18 +98,20 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; + // print log char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplySnapshotCb== term:%lu", ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, term:%lu", ths->pRaftStore->currentTerm); syncAppendEntriesReplyLog2(logBuf, pMsg); + // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, + sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term, ths->pRaftStore->currentTerm); return ret; } - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== before pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== before pMatchIndex", ths->pMatchIndex); + syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex); + syncIndexMgrLog2("recv SyncAppendEntriesReply before pMatchIndex:", ths->pMatchIndex); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { @@ -118,14 +120,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplySnapshotCb error term, receive:%lu current:%lu", + snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu", pMsg->term, ths->pRaftStore->currentTerm); syncNodeLog2(logBuf, ths); sError("%s", logBuf); return ret; } - assert(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] @@ -143,14 +145,25 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; + + // has snapshot + if (syncNodeHasSnapshot(ths)) { + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + if (nextIndex <= snapshot.lastApplyIndex) { + nextIndex = snapshot.lastApplyIndex + 1; + sInfo("reset new nextIndex %ld, snapshot.lastApplyIndex:%ld", nextIndex, snapshot.lastApplyIndex); + } + } + } else { nextIndex = SYNC_INDEX_BEGIN; } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== after pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== after pMatchIndex", ths->pMatchIndex); + syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); + syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); return ret; } \ No newline at end of file diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index b683ab12b4..74233ca4f3 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -94,6 +94,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // execute fsm if (pSyncNode->pFsm != NULL) { + int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, 0x1); + ASSERT(code == 0); + +#if 0 for (SyncIndex i = beginIndex; i <= endIndex; ++i) { if (i != SYNC_INDEX_INVALID) { SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); @@ -201,6 +205,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncEntryDestory(pEntry); } } +#endif } } } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 4a470af9c1..fdebbe3990 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -51,7 +51,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { } int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 04db39ec09..fe6d2b672a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1704,3 +1704,115 @@ const char* syncStr(ESyncState state) { return "ERROR"; } } + +int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { + int32_t code = 0; + + // maybe execute by leader, skip snapshot + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (ths->pFsm->FpGetSnapshot != NULL) { + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + } + if (beginIndex <= snapshot.lastApplyIndex) { + beginIndex = snapshot.lastApplyIndex + 1; + } + + // execute fsm + if (ths->pFsm != NULL) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry; + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); + ASSERT(code == 0); + ASSERT(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { + SFsmCbMeta cbMeta; + cbMeta.index = pEntry->index; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.code = 0; + cbMeta.state = ths->state; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.flag = flag; + + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } + + // config change + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; + + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); + + // update new config myIndex + bool hit = false; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + hit = true; + break; + } + } + + SReConfigCbMeta cbMeta = {0}; + bool isDrop; + + // I am in newConfig + if (hit) { + syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop); + + // change isStandBy to normal + if (!isDrop) { + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } + } + + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + } + + // always call FpReConfigCb + if (ths->pFsm->FpReConfigCb != NULL) { + cbMeta.code = 0; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.index = pEntry->index; + cbMeta.term = pEntry->term; + cbMeta.oldCfg = oldSyncCfg; + cbMeta.flag = 0x11; + cbMeta.isDrop = isDrop; + ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); + } + } + + // restore finish + if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + if (ths->restoreFinish == false) { + if (ths->pFsm->FpRestoreFinishCb != NULL) { + ths->pFsm->FpRestoreFinishCb(ths->pFsm); + } + ths->restoreFinish = true; + sInfo("restore finish %p vgId:%d", ths, ths->vgId); + } + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } + } + } + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4b9a8c51aa..d26ed6bcb4 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -117,121 +117,64 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { } int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pNextIndex", pSyncNode->pNextIndex); - syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pMatchIndex", pSyncNode->pMatchIndex); - logStoreSimpleLog2("==syncNodeAppendEntriesPeersSnapshot==", pSyncNode->pLogStore); + syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex); + syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex); + logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); + // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - SyncIndex preLogIndex; - SyncTerm preLogTerm; + + // pre index, pre term + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); // batch optimized // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); - // sending snapshot finish? - bool snapshotSendingFinish = false; - SSyncSnapshotSender* pSender = NULL; - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - if (syncUtilSameId(pDestId, &((pSyncNode->replicasId)[i]))) { - pSender = (pSyncNode->senders)[i]; - } - } - ASSERT(pSender != NULL); - snapshotSendingFinish = (pSender->finish) && (pSender->term == pSyncNode->pRaftStore->currentTerm); - if (snapshotSendingFinish) { - sInfo("snapshotSendingFinish! term:%lu", pSender->term); - } - - if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) || - syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { - // will send this msg until snapshot receive finish! - - SSnapshot snapshot = pSender->snapshot; - sInfo("nextIndex:%ld in snapshot: , begin snapshot", nextIndex, - snapshot.lastApplyIndex, snapshot.lastApplyTerm); + // prepare entry + SyncAppendEntries* pMsg = NULL; - // do not use next index - // always send from snapshot.lastApplyIndex + 1, and wait for snapshot transfer finish + SSyncRaftEntry* pEntry; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); + ASSERT(code == 0); - preLogIndex = snapshot.lastApplyIndex; - preLogTerm = snapshot.lastApplyTerm; + if (pEntry != NULL) { + pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); + ASSERT(pMsg != NULL); - // update next index! - syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, snapshot.lastApplyIndex + 1); + // add pEntry into msg + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(len == pEntry->bytes); + memcpy(pMsg->data, serialized, len); - // to claim leader - SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - assert(pMsg != NULL); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->prevLogIndex = preLogIndex; - pMsg->prevLogTerm = preLogTerm; - pMsg->commitIndex = pSyncNode->commitIndex; - - syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg); - - // send AppendEntries - syncNodeAppendEntries(pSyncNode, pDestId, pMsg); - syncAppendEntriesDestroy(pMsg); - - if (!snapshotSendingFinish) { - SSyncSnapshotSender* pSender = NULL; - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - if (syncUtilSameId(&((pSyncNode->replicasId)[i]), pDestId)) { - pSender = (pSyncNode->senders)[i]; - break; - } - } - ASSERT(pSender != NULL); - snapshotSenderStart(pSender); - } + taosMemoryFree(serialized); + syncEntryDestory(pEntry); } else { - ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm); - ASSERT(ret == 0); - - SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); - if (pEntry != NULL) { - pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); - assert(pMsg != NULL); - - // add pEntry into msg - uint32_t len; - char* serialized = syncEntrySerialize(pEntry, &len); - assert(len == pEntry->bytes); - memcpy(pMsg->data, serialized, len); - - taosMemoryFree(serialized); - syncEntryDestory(pEntry); - - } else { - // maybe overflow, send empty record - pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - assert(pMsg != NULL); - } - - assert(pMsg != NULL); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->prevLogIndex = preLogIndex; - pMsg->prevLogTerm = preLogTerm; - pMsg->commitIndex = pSyncNode->commitIndex; - - syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg); - - // send AppendEntries - syncNodeAppendEntries(pSyncNode, pDestId, pMsg); - syncAppendEntriesDestroy(pMsg); + // no entry in log + pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); + ASSERT(pMsg != NULL); } + + // prepare msg + ASSERT(pMsg != NULL); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + + // send msg + syncNodeAppendEntries(pSyncNode, pDestId, pMsg); + syncAppendEntriesDestroy(pMsg); } return ret; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 3eef2e7b1e..4b2d444596 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -81,24 +81,35 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { return ret; } +static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { + SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); + SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); + + if (pMsg->lastLogTerm > myLastTerm) { + return true; + } + if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) { + return true; + } + + return false; +} + int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t ret = 0; + // print log char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteSnapshotCb== term:%lu", ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVote, currentTerm:%lu", ths->pRaftStore->currentTerm); syncRequestVoteLog2(logBuf, pMsg); + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); } - assert(pMsg->term <= ths->pRaftStore->currentTerm); - - SyncIndex lastIndex; - SyncTerm lastTerm; - ret = syncNodeGetLastIndexTerm(ths, &lastIndex, &lastTerm); - ASSERT(ret == 0); + ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - bool logOK = (pMsg->lastLogTerm > lastTerm) || ((pMsg->lastLogTerm == lastTerm) && (pMsg->lastLogIndex >= lastIndex)); + bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); if (grant) { @@ -110,6 +121,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { syncNodeResetElectTimer(ths); } + // send msg SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index cff0167f1a..d2009fb355 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -96,12 +96,14 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; + // print log char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplySnapshotCb== term:%lu", ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%lu", ths->pRaftStore->currentTerm); syncRequestVoteReplyLog2(logBuf, pMsg); + // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, + sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term, ths->pRaftStore->currentTerm); return ret; } @@ -114,14 +116,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplySnapshotCb error term, receive:%lu current:%lu", + snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, error term, receive_term:%lu current_term:%lu", pMsg->term, ths->pRaftStore->currentTerm); syncNodePrint2(logBuf, ths); sError("%s", logBuf); return ret; } - assert(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index 4ada395016..9dbc201966 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -76,7 +76,7 @@ void test1() { gSnapshotLastApplyIndex = -1; gSnapshotLastApplyTerm = 0; - bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -90,7 +90,8 @@ void test1() { SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); sTrace("test1"); - sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, + snapshot.lastApplyTerm); sTrace("lastIndex: %ld", lastIndex); sTrace("lastTerm: %lu", lastTerm); sTrace("syncStartIndex: %ld", syncStartIndex); @@ -133,7 +134,7 @@ void test2() { gSnapshotLastApplyIndex = -1; gSnapshotLastApplyTerm = 0; - bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -143,7 +144,8 @@ void test2() { SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); sTrace("test2"); - sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, + snapshot.lastApplyTerm); sTrace("lastIndex: %ld", lastIndex); sTrace("lastTerm: %lu", lastTerm); sTrace("syncStartIndex: %ld", syncStartIndex); @@ -158,7 +160,6 @@ void test2() { logStoreDestory(pLogStore); cleanup(); - } void test3() { @@ -176,7 +177,7 @@ void test3() { gSnapshotLastApplyIndex = 5; gSnapshotLastApplyTerm = 100; - bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -189,7 +190,8 @@ void test3() { SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); sTrace("test3"); - sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, + snapshot.lastApplyTerm); sTrace("lastIndex: %ld", lastIndex); sTrace("lastTerm: %lu", lastTerm); sTrace("syncStartIndex: %ld", syncStartIndex); @@ -232,7 +234,7 @@ void test4() { gSnapshotLastApplyIndex = 5; gSnapshotLastApplyTerm = 100; - bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -242,7 +244,8 @@ void test4() { SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); sTrace("test4"); - sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, + snapshot.lastApplyTerm); sTrace("lastIndex: %ld", lastIndex); sTrace("lastTerm: %lu", lastTerm); sTrace("syncStartIndex: %ld", syncStartIndex); @@ -292,7 +295,7 @@ void test5() { gSnapshotLastApplyIndex = 5; gSnapshotLastApplyTerm = 100; - bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -302,7 +305,8 @@ void test5() { SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); sTrace("test5"); - sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, + snapshot.lastApplyTerm); sTrace("lastIndex: %ld", lastIndex); sTrace("lastTerm: %lu", lastTerm); sTrace("syncStartIndex: %ld", syncStartIndex); -- GitLab