From eba8f2b543d0eaab35a573c8557734e766a624b7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 7 Jun 2022 19:20:05 +0800 Subject: [PATCH] fix(sync) send snapshot multi-times --- source/libs/sync/inc/syncIndexMgr.h | 8 ++++---- source/libs/sync/src/syncAppendEntries.c | 5 +++-- source/libs/sync/src/syncAppendEntriesReply.c | 20 +++++++++++++++++++ source/libs/sync/src/syncReplication.c | 3 ++- source/libs/sync/src/syncSnapshot.c | 17 +++++++++++----- source/libs/sync/test/syncIndexMgrTest.cpp | 10 +++++----- 6 files changed, 46 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 1983d407df..e11094e73a 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -41,11 +41,11 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); +char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); -void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); -SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); +// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); // for debug ------------------- void syncIndexMgrPrint(SSyncIndexMgr *pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 00331b3782..41bf7ac778 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -18,6 +18,7 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" #include "wal.h" @@ -576,7 +577,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->term = ths->pRaftStore->currentTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; - pReply->privateTerm = pMsg->privateTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); @@ -661,7 +662,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; pReply->success = true; - pReply->privateTerm = pMsg->privateTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; if (hasAppendEntries) { pReply->matchIndex = pMsg->prevLogIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 1a74b0dd72..94406c458e 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -175,6 +175,26 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pSender != NULL); SyncIndex sentryIndex; + if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { + // already start + sentryIndex = pSender->snapshot.lastApplyIndex; + sTrace( + "sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "pSender->privateTerm:%lu", + pSender->term, ths->pRaftStore->currentTerm, pSender->privateTerm); + + } else { + if (pMsg->privateTerm >= pSender->privateTerm) { + // donot start again + sentryIndex = pSender->snapshot.lastApplyIndex; + + } else { + // start first time + snapshotSenderDoStart(pSender); + pSender->start = true; + sentryIndex = pSender->snapshot.lastApplyIndex; + } + } #if 0 SyncIndex sentryIndex; if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 598260f390..ff39b0b13d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -177,7 +177,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { pMsg->prevLogIndex = preLogIndex; pMsg->prevLogTerm = preLogTerm; pMsg->commitIndex = pSyncNode->commitIndex; - pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId); + pMsg->privateTerm = 0; + // pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId); // send msg syncNodeAppendEntries(pSyncNode, pDestId, pMsg); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 4b60939edf..5715354de0 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; - pSender->privateTerm = 0; + pSender->privateTerm = taosGetTimestampMs() + 100; pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; @@ -62,6 +62,9 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { // begin send snapshot (current term, seq begin) void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { + // when start, increase term + ++(pSender->privateTerm); + pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; @@ -82,6 +85,7 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN + pMsg->privateTerm = pSender->privateTerm; // send SRpcMsg rpcMsg; @@ -92,9 +96,6 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr); taosMemoryFree(msgStr); - // when start, increase term - ++(pSender->privateTerm); - syncSnapshotSendDestroy(pMsg); } @@ -198,6 +199,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->seq = pSender->seq; + pMsg->privateTerm = pSender->privateTerm; memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); SRpcMsg rpcMsg; @@ -316,6 +318,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl pReceiver->pSyncNode = pSyncNode; pReceiver->replicaIndex = replicaIndex; pReceiver->term = pSyncNode->pRaftStore->currentTerm; + pReceiver->privateTerm = 0; + } else { sInfo("snapshotReceiverCreate cannot create receiver"); } @@ -377,6 +381,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } pReceiver->start = false; + ++(pReceiver->privateTerm); char *s = snapshotReceiver2Str(pReceiver); sInfo("snapshotReceiverStop %s", s); @@ -429,6 +434,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // begin snapshotReceiverStart(pReceiver); pReceiver->ack = pMsg->seq; + pReceiver->privateTerm = pMsg->privateTerm; needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); @@ -490,6 +496,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->ack = pReceiver->ack; + pRspMsg->privateTerm = pReceiver->privateTerm; SRpcMsg rpcMsg; syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); @@ -526,7 +533,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { snapshotSenderStop(pSender); // update nextIndex private term - syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm); + // syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm); return 0; } diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index e920ae3ac6..7fb80e9d58 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -66,9 +66,9 @@ int main(int argc, char** argv) { syncIndexMgrSetIndex(pSyncIndexMgr, &ids[0], 100); syncIndexMgrSetIndex(pSyncIndexMgr, &ids[1], 200); syncIndexMgrSetIndex(pSyncIndexMgr, &ids[2], 300); - syncIndexMgrSetTerm(pSyncIndexMgr, &ids[0], 700); - syncIndexMgrSetTerm(pSyncIndexMgr, &ids[1], 800); - syncIndexMgrSetTerm(pSyncIndexMgr, &ids[2], 900); + //syncIndexMgrSetTerm(pSyncIndexMgr, &ids[0], 700); + //syncIndexMgrSetTerm(pSyncIndexMgr, &ids[1], 800); + //syncIndexMgrSetTerm(pSyncIndexMgr, &ids[2], 900); { char* serialized = syncIndexMgr2Str(pSyncIndexMgr); assert(serialized != NULL); @@ -80,8 +80,8 @@ int main(int argc, char** argv) { printf("---------------------------------------\n"); for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { SyncIndex idx = syncIndexMgrGetIndex(pSyncIndexMgr, &ids[i]); - SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]); - printf("%d: index:%ld term:%lu \n", i, idx, term); + //SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]); + //printf("%d: index:%ld term:%lu \n", i, idx, term); } printf("---------------------------------------\n"); -- GitLab