From d04c12713ac5577f9bf8f26da3d2f3bef968d1b2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 2 Jun 2022 11:36:26 +0800 Subject: [PATCH] fix: send snapshot --- source/libs/sync/inc/syncSnapshot.h | 15 ++++++----- source/libs/sync/src/syncAppendEntries.c | 6 +++++ source/libs/sync/src/syncIO.c | 3 ++- source/libs/sync/src/syncReplication.c | 27 +++++++++++++------ source/libs/sync/src/syncSnapshot.c | 5 +++- .../test/syncConfigChangeSnapshotTest.cpp | 2 +- 6 files changed, 40 insertions(+), 18 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index ccdf099482..f34c5fffb0 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -39,14 +39,15 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; SSyncNode *pSyncNode; int32_t replicaIndex; SyncTerm term; + bool finish; } SSyncSnapshotSender; SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); @@ -55,14 +56,14 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SSyncNode *pSyncNode; @@ -73,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index db38a46e31..4ff3b95890 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -471,12 +471,18 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ASSERT(pMsg->prevLogIndex == snapshot.lastApplyIndex); logOK = (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->prevLogTerm == snapshot.lastApplyTerm); + sTrace( + "1 - logOK:%d, pMsg->prevLogIndex:%ld, snapshot.lastApplyIndex:%ld, pMsg->prevLogTerm:%lu, " + "snapshot.lastApplyTerm:%lu", + logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm); } else { logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm)); + sTrace("2 - logOK:%d, pMsg->prevLogIndex:%ld, getLastIndex:%ld, pMsg->prevLogTerm:%lu, localPreLogTerm:%lu", logOK, + pMsg->prevLogIndex, ths->pLogStore->getLastIndex(ths->pLogStore), pMsg->prevLogTerm, localPreLogTerm); } // reject request diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 5915380c1a..3c69432837 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -75,7 +75,8 @@ int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { syncUtilMsgNtoH(pMsg->pCont); char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d msgType:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->msgType); + snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d msgType:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port, + pMsg->msgType); syncRpcMsgLog2(logBuf, pMsg); syncUtilMsgHtoN(pMsg->pCont); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index b645fa9ac8..83fbb0aa97 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -134,7 +134,22 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { // batch optimized // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); - if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { + // sending snapshot finish? + bool snapshotSendingFinish = false; + SSyncSnapshotSender* pSender = NULL; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncUtilSameId(pDestId, &((pSyncNode->replicasId)[i]))) { + pSender = (pSyncNode->senders)[i]; + } + } + ASSERT(pSender != NULL); + snapshotSendingFinish = (pSender->finish) && (pSender->term == pSyncNode->pRaftStore->currentTerm); + if (snapshotSendingFinish) { + sInfo("snapshotSendingFinish! term:%lu", pSender->term); + } + + if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) || + syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { // will send this msg until snapshot receive finish! SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -142,14 +157,10 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { snapshot.lastApplyIndex, snapshot.lastApplyTerm); // do not use next index - // always send from new last index + 1 - SyncIndex lastIndex; - SyncTerm lastTerm; - ret = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm); - ASSERT(ret == 0); + // always send from snapshot.lastApplyIndex + 1, and wait for snapshot transfer finish - ret = syncNodeGetPreIndexTerm(pSyncNode, lastIndex + 1, &preLogIndex, &preLogTerm); - ASSERT(ret == 0); + preLogIndex = snapshot.lastApplyIndex; + preLogTerm = snapshot.lastApplyTerm; // to claim leader SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index e06f1a9ec4..e423d4d369 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -40,6 +40,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->finish = false; } else { sInfo("snapshotSenderCreate cannot create sender"); } @@ -270,6 +271,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term); cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); } cJSON *pJson = cJSON_CreateObject(); @@ -435,7 +437,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pReceiver->pWriter = NULL; snapshotReceiverStop(pReceiver); pReceiver->ack = pMsg->seq; - needRsp = false; + needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr); @@ -506,6 +508,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { // receiver ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { + pSender->finish = true; snapshotSenderStop(pSender); return 0; } diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index ba7199f9b0..781c168da9 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -129,7 +129,7 @@ int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { sTrace("%s", logBuf); if (isApply) { - gSnapshotLastApplyIndex = 7; + gSnapshotLastApplyIndex = 10; gSnapshotLastApplyTerm = 1; } -- GitLab