From a89e45526759e46dce516ed7cafe9f8d79211342 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 7 Jun 2022 09:55:18 +0800 Subject: [PATCH] fix(sync): delete some assert temporary --- source/libs/sync/inc/syncInt.h | 2 ++ source/libs/sync/src/syncAppendEntriesReply.c | 8 +++++++- source/libs/sync/src/syncMain.c | 17 ++++++++++++++--- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ba7e35ec5b..076526e6c9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -225,6 +225,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f5bcda1955..1e587dac92 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -104,6 +104,12 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, term:%lu", ths->pRaftStore->currentTerm); syncAppendEntriesReplyLog2(logBuf, pMsg); + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + sInfo("maybe already dropped"); + return ret; + } + // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term, @@ -162,7 +168,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); if (nextIndex <= snapshot.lastApplyIndex) { - ASSERT(nextIndex == snapshot.lastApplyIndex); + // ASSERT(nextIndex == snapshot.lastApplyIndex); nextIndex = snapshot.lastApplyIndex + 1; sInfo("reset new nextIndex %ld, snapshot.lastApplyIndex:%ld", nextIndex, snapshot.lastApplyIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2aafbb1fb5..951da64b7c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1347,7 +1347,7 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); } - ASSERT(index > snapshot.lastApplyIndex); + // ASSERT(index > snapshot.lastApplyIndex); preIndex = index - 1; } else { @@ -1375,7 +1375,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); } - ASSERT(index > snapshot.lastApplyIndex); + // ASSERT(index > snapshot.lastApplyIndex); if (index > snapshot.lastApplyIndex + 1) { // should be log preTerm SSyncRaftEntry* pPreEntry = NULL; @@ -1389,7 +1389,9 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { } else if (index == snapshot.lastApplyIndex + 1) { preTerm = snapshot.lastApplyTerm; } else { - ASSERT(0); + // ASSERT(0); + // maybe snapshot change + preTerm = snapshot.lastApplyTerm; } } else { @@ -1799,4 +1801,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, } } return 0; +} + +bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) { + for (int i = 0; i < ths->replicaNum; ++i) { + if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) { + return true; + } + } + return false; } \ No newline at end of file -- GitLab