From 9baaf2cd0c0a9f54bdf7de7868b8660f314b6230 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 6 Jun 2022 20:55:59 +0800 Subject: [PATCH] enh(sync): add log index manager --- source/libs/sync/src/syncAppendEntriesReply.c | 18 +++++++++++++++++- source/libs/sync/src/syncReplication.c | 6 ++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f246e4fb29..f5bcda1955 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -112,7 +112,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex); - syncIndexMgrLog2("recv SyncAppendEntriesReply before pMatchIndex:", ths->pMatchIndex); + syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); + { + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", + snapshot.lastApplyIndex, snapshot.lastApplyTerm); + } // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { @@ -133,6 +139,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + sTrace("update next index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); @@ -144,6 +151,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + sTrace("begin to update next index:%ld, success:%d", nextIndex, pMsg->success); // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { @@ -180,11 +188,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } else { nextIndex = SYNC_INDEX_BEGIN; } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + sTrace("update next index:%ld, success:%d", nextIndex, pMsg->success); } syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); + { + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", + snapshot.lastApplyIndex, snapshot.lastApplyTerm); + } return ret; } \ No newline at end of file diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index d26ed6bcb4..ab3614bac3 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -122,6 +122,12 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex); syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex); logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); + { + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", + snapshot.lastApplyIndex, snapshot.lastApplyTerm); + } int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { -- GitLab