From 29b97fa0ad843c12f3a5d84cf1966cb4c1398e10 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 8 Jun 2022 19:53:07 +0800 Subject: [PATCH] refactor(sync): syncNodeOnAppendEntriesReplySnapshotCb --- include/libs/sync/syncTools.h | 5 +- source/libs/sync/inc/syncSnapshot.h | 6 +- source/libs/sync/src/syncAppendEntriesReply.c | 69 ++++++++----------- source/libs/sync/src/syncMessage.c | 1 + source/libs/sync/src/syncSnapshot.c | 60 ++++++++-------- source/libs/sync/test/syncSnapshotRspTest.cpp | 1 + 6 files changed, 66 insertions(+), 76 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index dcbb76fe49..27437ae3c2 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -398,7 +398,7 @@ typedef struct SyncSnapshotSend { SyncTerm term; SyncIndex lastIndex; // lastIndex of snapshot SyncTerm lastTerm; // lastTerm of snapshot - SyncTerm privateTerm; + SyncTerm privateTerm; int32_t seq; uint32_t dataLen; char data[]; @@ -433,8 +433,9 @@ typedef struct SyncSnapshotRsp { SyncTerm term; SyncIndex lastIndex; SyncTerm lastTerm; - SyncTerm privateTerm; + SyncTerm privateTerm; int32_t ack; + int32_t code; } SyncSnapshotRsp; SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 63e69f77ca..b16e47b51e 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -48,7 +48,7 @@ typedef struct SSyncSnapshotSender { int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; - bool apply; + bool finish; } SSyncSnapshotSender; SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); @@ -75,9 +75,9 @@ typedef struct SSyncSnapshotReceiver { SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index e6bef88a4c..a14ce3607f 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -101,7 +101,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // print log char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, term:%lu", ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId, + ths->pRaftStore->currentTerm); syncAppendEntriesReplyLog2(logBuf, pMsg); // if already drop replica, do not process @@ -145,7 +146,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - sTrace("update next index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); + sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); @@ -157,49 +158,35 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - sTrace("begin to update next index:%ld, success:%d", nextIndex, pMsg->success); + sTrace("update next not match, begin, index:%ld, success:%d", nextIndex, pMsg->success); // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; - // has snapshot - if (syncNodeHasSnapshot(ths)) { - // get sender - SSyncSnapshotSender* pSender = NULL; - for (int i = 0; i < ths->replicaNum; ++i) { - if (syncUtilSameId(&(pMsg->srcId), &((ths->replicasId)[i]))) { - pSender = (ths->senders)[i]; - } - } - ASSERT(pSender != NULL); - - SyncIndex sentryIndex; - if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { - // already start - sentryIndex = pSender->snapshot.lastApplyIndex; - sTrace( - "sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "pSender->privateTerm:%lu", - pSender->term, ths->pRaftStore->currentTerm, pSender->privateTerm); - - } else { - if (pMsg->privateTerm >= pSender->privateTerm) { - // donot start again - sentryIndex = pSender->snapshot.lastApplyIndex; - - } else { - // start first time - snapshotSenderStart(pSender); - pSender->start = true; - sentryIndex = pSender->snapshot.lastApplyIndex; - } - } - - // update nextIndex to sentryIndex + 1 - if (nextIndex <= sentryIndex) { - nextIndex = sentryIndex + 1; - } + // get sender + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); + ASSERT(pSender != NULL); + bool hasSnapshot = syncNodeHasSnapshot(ths); + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + + // start sending snapshot first time + // start here, stop by receiver + if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) && + pMsg->privateTerm < pSender->privateTerm) { + snapshotSenderStart(pSender); + + char* s = snapshotSender2Str(pSender); + sInfo("snapshot send, start sender first time, sender:%s", s); + taosMemoryFree(s); + } + + SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; + + // update nextIndex to sentryIndex + if (nextIndex <= sentryIndex) { + nextIndex = sentryIndex; } } else { @@ -207,7 +194,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); - sTrace("update next index:%ld, success:%d", nextIndex, pMsg->success); + sTrace("update next not match, end, index:%ld, success:%d", nextIndex, pMsg->success); } syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 221a8e1af2..056c2d5c5b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1991,6 +1991,7 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) { cJSON_AddStringToObject(pRoot, "lastTerm", u64buf); cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack); + cJSON_AddNumberToObject(pRoot, "code", pMsg->code); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 1d52d9be33..6e546ecedc 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -20,8 +20,9 @@ #include "syncUtil.h" #include "wal.h" -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver); +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm); +//---------------------------------- SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && (pSyncNode->pFsm->FpSnapshotDoRead != NULL); @@ -44,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->privateTerm = taosGetTimestampMs() + 100; pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); - pSender->apply = false; + pSender->finish = false; } else { sError("snapshotSenderCreate cannot create sender"); } @@ -86,7 +87,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; ++(pSender->privateTerm); - pSender->apply = false; + pSender->finish = false; pSender->start = true; // build begin msg @@ -299,7 +300,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); - cJSON_AddNumberToObject(pRoot, "apply", pSender->apply); + cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); } cJSON *pJson = cJSON_CreateObject(); @@ -349,8 +350,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } // begin receive snapshot msg (current term, seq begin) -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver) { +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) { pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + pReceiver->privateTerm = privateTerm; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; ASSERT(pReceiver->pWriter == NULL); @@ -360,10 +362,10 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver) { // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) { if (!snapshotReceiverIsStart(pReceiver)) { // start - snapshotReceiverDoStart(pReceiver); + snapshotReceiverDoStart(pReceiver, privateTerm); pReceiver->start = true; } else { @@ -376,7 +378,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { pReceiver->pWriter = NULL; // start again - snapshotReceiverDoStart(pReceiver); + snapshotReceiverDoStart(pReceiver, privateTerm); pReceiver->start = true; ASSERT(0); @@ -387,7 +389,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { taosMemoryFree(s); } -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); @@ -396,7 +398,10 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } pReceiver->start = false; - ++(pReceiver->privateTerm); + + if (apply) { + ++(pReceiver->privateTerm); + } char *s = snapshotReceiver2Str(pReceiver); sInfo("snapshotReceiverStop %s", s); @@ -441,15 +446,15 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // get receiver SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; bool needRsp = false; + int32_t writeCode = 0; // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { // begin - snapshotReceiverStart(pReceiver); + snapshotReceiverStart(pReceiver, pMsg->privateTerm); pReceiver->ack = pMsg->seq; - pReceiver->privateTerm = pMsg->privateTerm; needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); @@ -459,7 +464,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { // end, finish FSM - pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + ASSERT(writeCode == 0); + pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); @@ -472,11 +479,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr); taosMemoryFree(logSimpleStr); - // walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex); - // sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex); - pReceiver->pWriter = NULL; - snapshotReceiverStop(pReceiver); + snapshotReceiverStop(pReceiver, true); pReceiver->ack = pMsg->seq; needRsp = true; @@ -487,7 +491,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); - snapshotReceiverStop(pReceiver); + snapshotReceiverStop(pReceiver, false); needRsp = false; char *msgStr = syncSnapshotSend2Str(pMsg); @@ -499,7 +503,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { // transfering if (pMsg->seq == pReceiver->ack + 1) { - pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + writeCode = + pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + ASSERT(writeCode == 0); pReceiver->ack = pMsg->seq; } needRsp = true; @@ -521,6 +527,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->ack = pReceiver->ack; + pRspMsg->code = writeCode; pRspMsg->privateTerm = pReceiver->privateTerm; SRpcMsg rpcMsg; @@ -541,12 +548,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // if ack == SYNC_SNAPSHOT_SEQ_END, stop sender int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // get sender - SSyncSnapshotSender *pSender = NULL; - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { - pSender = (pSyncNode->senders)[i]; - } - } + SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); ASSERT(pSender != NULL); // state, term, seq/ack @@ -554,12 +556,8 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { // receiver ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - pSender->apply = true; + pSender->finish = true; snapshotSenderStop(pSender); - - // update nextIndex private term - // syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm); - return 0; } @@ -569,8 +567,10 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { pSender->ack = pMsg->ack; (pSender->seq)++; snapshotSend(pSender); + } else if (pMsg->ack == pSender->seq - 1) { snapshotReSend(pSender); + } else { ASSERT(0); } diff --git a/source/libs/sync/test/syncSnapshotRspTest.cpp b/source/libs/sync/test/syncSnapshotRspTest.cpp index d624d45c94..f689d47aaf 100644 --- a/source/libs/sync/test/syncSnapshotRspTest.cpp +++ b/source/libs/sync/test/syncSnapshotRspTest.cpp @@ -25,6 +25,7 @@ SyncSnapshotRsp *createMsg() { pMsg->lastIndex = 22; pMsg->lastTerm = 33; pMsg->ack = 44; + pMsg->code = 55; return pMsg; } -- GitLab