From 4c410bd5eceda845ed6850d986cbe516f7a4cc80 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 28 Jun 2022 11:33:14 +0800 Subject: [PATCH] refactor(sync): refactor snapshot code --- source/libs/sync/inc/syncSnapshot.h | 4 +- source/libs/sync/src/syncAppendEntriesReply.c | 28 +- source/libs/sync/src/syncSnapshot.c | 247 ++++++++++++------ 3 files changed, 168 insertions(+), 111 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 55e0755a89..14688f8912 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); -void snapshotSenderStop(SSyncSnapshotSender *pSender); +void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); @@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index b7122b9c52..10f2745651 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); - SSnapshot snapshot; + SSnapshot snapshot = {.data = NULL, + .lastApplyIndex = SYNC_INDEX_INVALID, + .lastApplyTerm = 0, + .lastConfigIndex = SYNC_INDEX_INVALID}; void* pReader = NULL; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader); if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && @@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pReader != NULL); snapshotSenderStart(pSender, snapshot, pReader); - char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); - syncNodeEventLog(ths, eventLog); - taosMemoryFree(eventLog); - } else { // no snapshot if (pReader != NULL) { @@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } } - /* - bool hasSnapshot = syncNodeHasSnapshot(ths); - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - - // start sending snapshot first time - // start here, stop by receiver - if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) && - pMsg->privateTerm < pSender->privateTerm) { - snapshotSenderStart(pSender); - - char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); - syncNodeEventLog(ths, eventLog); - taosMemoryFree(eventLog); - } - */ - SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; // update nextIndex to sentryIndex @@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex); - return ret; + return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 39e972c4c4..7b15be2d14 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -24,6 +24,7 @@ //---------------------------------- static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); +static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); //---------------------------------- SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { @@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { - sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId); + sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId); } return pSender; @@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void syncEntryDestory(pEntry); } else { if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) { - sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId); + sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId); pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg; getLastConfig = true; } @@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void // event log do { - char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send"); + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); syncNodeEventLog(pSender->pSyncNode, eventLog); taosMemoryFree(eventLog); } while (0); } -void snapshotSenderStop(SSyncSnapshotSender *pSender) { +void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // close reader if (pSender->pReader != NULL) { int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); @@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) { // update flag pSender->start = false; + pSender->finish = finish; + + // event log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop"); + syncNodeEventLog(pSender->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } // when sender receive ack, call this function to send msg from seq @@ -386,7 +395,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; } else { - sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId); + sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId); } return pReceiver; @@ -409,9 +418,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } -// static do start +// static do start by privateTerm, pBeginMsg // receive first snapshot data -// privateTerm, pBeginMsg +// write first block data static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { // update state @@ -419,6 +428,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p pReceiver->privateTerm = privateTerm; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->fromId = pBeginMsg->srcId; + pReceiver->start = true; // update snapshot pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; @@ -429,8 +439,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p ASSERT(pReceiver->pWriter == NULL); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); ASSERT(ret == 0); + + // event log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } +// force stop static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { // force close, abandon incomplete data if (pReceiver->pWriter != NULL) { @@ -441,33 +459,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { } pReceiver->start = false; + + // event log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { if (!snapshotReceiverIsStart(pReceiver)) { - // start + // first start snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); - pReceiver->start = true; } else { // already start - sInfo("snapshot recv, receiver already start"); + sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId); // force close, abandon incomplete data - int32_t ret = - pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); - ASSERT(ret == 0); - pReceiver->pWriter = NULL; + snapshotReceiverForceStop(pReceiver); // start again snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); - pReceiver->start = true; } } -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); @@ -477,8 +497,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { pReceiver->start = false; - if (apply) { - // ++(pReceiver->privateTerm); + // event log + do { + SSnapshot snapshot; + pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); +} + +static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { + ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END); + + if (pReceiver->pWriter != NULL) { + int32_t code = 0; + if (pMsg->dataLen > 0) { + code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, + pMsg->dataLen); + ASSERT(code == 0); + } + + code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); + ASSERT(code == 0); + pReceiver->pWriter = NULL; + } + + pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; + + // update commit index + if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { + pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; + } + + // reset wal + pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); + + // event log + do { + SSnapshot snapshot; + pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); +} + +static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { + ASSERT(pMsg->seq == pReceiver->ack + 1); + + if (pReceiver->pWriter != NULL) { + if (pMsg->dataLen > 0) { + int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + pMsg->data, pMsg->dataLen); + ASSERT(code == 0); + } + pReceiver->ack = pMsg->seq; + + // event log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } } @@ -560,33 +641,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // get receiver SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; bool needRsp = false; - int32_t writeCode = 0; // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - // begin + // begin, no data snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg); - pReceiver->ack = pMsg->seq; needRsp = true; - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { // end, finish FSM - writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); - ASSERT(writeCode == 0); - - pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); - if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { - pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; - } - - // pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); - pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex); + snapshotReceiverFinish(pReceiver, pMsg); + snapshotReceiverStop(pReceiver); + needRsp = true; // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { @@ -601,89 +669,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex); } - SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish, apply snapshot"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - - pReceiver->pWriter = NULL; - snapshotReceiverStop(pReceiver, true); - pReceiver->ack = pMsg->seq; - needRsp = true; - - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { - pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); - snapshotReceiverStop(pReceiver, false); + // force close + snapshotReceiverForceStop(pReceiver); needRsp = false; - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { // transfering if (pMsg->seq == pReceiver->ack + 1) { - writeCode = - pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); - ASSERT(writeCode == 0); - pReceiver->ack = pMsg->seq; + snapshotReceiverGotData(pReceiver, pMsg); } needRsp = true; - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } else { ASSERT(0); } + // send ack if (needRsp) { + // build msg SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pReceiver->ack; - pRspMsg->code = writeCode; - pRspMsg->privateTerm = pReceiver->privateTerm; + pRspMsg->ack = pReceiver->ack; // receiver maybe already closed + pRspMsg->code = 0; + pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed + // send msg SRpcMsg rpcMsg; syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); - syncSnapshotRspDestroy(pRspMsg); } + } else { + // error log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } } else { - syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode); + // error log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } return 0; } +static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + ASSERT(pMsg->ack == pSender->seq); + pSender->ack = pMsg->ack; + ++(pSender->seq); +} + // sender receives ack, set seq = ack + 1, send msg from seq // if ack == SYNC_SNAPSHOT_SEQ_END, stop sender int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - sInfo("recv SyncSnapshotRsp maybe replica already dropped"); - return 0; + sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId); + return -1; } // get sender @@ -695,27 +748,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { // receiver ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - pSender->finish = true; - snapshotSenderStop(pSender); + snapshotSenderStop(pSender, true); return 0; } // send next msg if (pMsg->ack == pSender->seq) { // update sender ack - pSender->ack = pMsg->ack; - (pSender->seq)++; + snapshotSenderUpdateProgress(pSender, pMsg); snapshotSend(pSender); } else if (pMsg->ack == pSender->seq - 1) { snapshotReSend(pSender); } else { - ASSERT(0); + do { + char logBuf[64]; + snprintf(logBuf, sizeof(logBuf), "error ack, recv ack:%d, my seq:%d", pMsg->ack, pSender->seq); + char *eventLog = snapshotSender2SimpleStr(pSender, logBuf); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return -1; } + } else { + // error log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return -1; } } else { - syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode); + // error log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return -1; } return 0; -- GitLab