diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index ccdf09948206ce9bc7e7b99a8ab0f00c6ed1665b..f34c5fffb063387a41316090b706d56175e72a2d 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -39,14 +39,15 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; SSyncNode *pSyncNode; int32_t replicaIndex; SyncTerm term; + bool finish; } SSyncSnapshotSender; SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); @@ -55,14 +56,14 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SSyncNode *pSyncNode; @@ -73,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index db38a46e31978cd4d3e44f8561a47f874acbccc8..4ff3b95890f783e394e8e9a7cd252ddca9b59ae9 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -471,12 +471,18 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ASSERT(pMsg->prevLogIndex == snapshot.lastApplyIndex); logOK = (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->prevLogTerm == snapshot.lastApplyTerm); + sTrace( + "1 - logOK:%d, pMsg->prevLogIndex:%ld, snapshot.lastApplyIndex:%ld, pMsg->prevLogTerm:%lu, " + "snapshot.lastApplyTerm:%lu", + logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm); } else { logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm)); + sTrace("2 - logOK:%d, pMsg->prevLogIndex:%ld, getLastIndex:%ld, pMsg->prevLogTerm:%lu, localPreLogTerm:%lu", logOK, + pMsg->prevLogIndex, ths->pLogStore->getLastIndex(ths->pLogStore), pMsg->prevLogTerm, localPreLogTerm); } // reject request diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 5915380c1a2d5351006faf77eab25388706d38c5..3c694328378633fa2354a8d034990a3ddf7e3941 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -75,7 +75,8 @@ int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { syncUtilMsgNtoH(pMsg->pCont); char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d msgType:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->msgType); + snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d msgType:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port, + pMsg->msgType); syncRpcMsgLog2(logBuf, pMsg); syncUtilMsgHtoN(pMsg->pCont); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index b645fa9ac8c9d90537ca0cb617841dd94a2f8610..83fbb0aa97d65f81ba1a9eb36ea8fdcbf3f8dc06 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -134,7 +134,22 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { // batch optimized // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); - if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { + // sending snapshot finish? + bool snapshotSendingFinish = false; + SSyncSnapshotSender* pSender = NULL; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncUtilSameId(pDestId, &((pSyncNode->replicasId)[i]))) { + pSender = (pSyncNode->senders)[i]; + } + } + ASSERT(pSender != NULL); + snapshotSendingFinish = (pSender->finish) && (pSender->term == pSyncNode->pRaftStore->currentTerm); + if (snapshotSendingFinish) { + sInfo("snapshotSendingFinish! term:%lu", pSender->term); + } + + if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) || + syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { // will send this msg until snapshot receive finish! SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); @@ -142,14 +157,10 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { snapshot.lastApplyIndex, snapshot.lastApplyTerm); // do not use next index - // always send from new last index + 1 - SyncIndex lastIndex; - SyncTerm lastTerm; - ret = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm); - ASSERT(ret == 0); + // always send from snapshot.lastApplyIndex + 1, and wait for snapshot transfer finish - ret = syncNodeGetPreIndexTerm(pSyncNode, lastIndex + 1, &preLogIndex, &preLogTerm); - ASSERT(ret == 0); + preLogIndex = snapshot.lastApplyIndex; + preLogTerm = snapshot.lastApplyTerm; // to claim leader SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index e06f1a9ec4ad20307942fe7a99626bdf0056d9a9..e423d4d36913d50e8f29a3006d43f251525734b9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -40,6 +40,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->finish = false; } else { sInfo("snapshotSenderCreate cannot create sender"); } @@ -270,6 +271,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term); cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); } cJSON *pJson = cJSON_CreateObject(); @@ -435,7 +437,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pReceiver->pWriter = NULL; snapshotReceiverStop(pReceiver); pReceiver->ack = pMsg->seq; - needRsp = false; + needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr); @@ -506,6 +508,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { // receiver ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { + pSender->finish = true; snapshotSenderStop(pSender); return 0; } diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index ba7199f9b0013374c48eb33d1a70000ced46d878..781c168da9fbe756f48c65e58cb48bad66b60d32 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -129,7 +129,7 @@ int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { sTrace("%s", logBuf); if (isApply) { - gSnapshotLastApplyIndex = 7; + gSnapshotLastApplyIndex = 10; gSnapshotLastApplyTerm = 1; }