提交 9baaf2cd 编写于 作者: M Minghao Li

enh(sync): add log index manager

上级 2071c5a7
...@@ -112,7 +112,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -112,7 +112,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} }
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply before pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
{
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
// no need this code, because if I receive reply.term, then I must have sent for that term. // no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) { // if (pMsg->term > ths->pRaftStore->currentTerm) {
...@@ -133,6 +139,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -133,6 +139,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (pMsg->success) { if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
sTrace("update next index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
...@@ -144,6 +151,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -144,6 +151,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} else { } else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
sTrace("begin to update next index:%ld, success:%d", nextIndex, pMsg->success);
// notice! int64, uint64 // notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) { if (nextIndex > SYNC_INDEX_BEGIN) {
...@@ -180,11 +188,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -180,11 +188,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} else { } else {
nextIndex = SYNC_INDEX_BEGIN; nextIndex = SYNC_INDEX_BEGIN;
} }
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
sTrace("update next index:%ld, success:%d", nextIndex, pMsg->success);
} }
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
{
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
return ret; return ret;
} }
\ No newline at end of file
...@@ -122,6 +122,12 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -122,6 +122,12 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex); syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex); syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
{
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册