From b42c963a5f7085b7dac21076de7996a33d187ce2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 20:08:27 +0800 Subject: [PATCH] fix(sync): fake match when prevLogIndex less than commitIndex --- source/libs/sync/src/syncAppendEntries.c | 26 +++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4ebd279668..7a4edb668a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -880,13 +880,14 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } } while (0); - // fake match + // fake match2 // // condition1: // preIndex <= my commit index // // operation: - // match my commit index + // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry + // match my-commit-index or my-commit-index + 1 // no operation on log do { bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && @@ -895,6 +896,25 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex, ths->commitIndex); + SyncIndex matchIndex = ths->commitIndex; + bool hasAppendEntries = pMsg->dataLen > 0; + if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { + // append entry + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ASSERT(pAppendEntry != NULL); + + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); + + // pre commit + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); + + matchIndex = pMsg->prevLogIndex + 1; + + syncEntryDestory(pAppendEntry); + } + // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; @@ -902,7 +922,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->term = ths->pRaftStore->currentTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; - pReply->matchIndex = ths->commitIndex; + pReply->matchIndex = matchIndex; // send response SRpcMsg rpcMsg; -- GitLab