From 71c57d7769d54ffc65e7d694c494dfeebaec9f39 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 1 Jun 2022 11:28:33 +0800 Subject: [PATCH] refactor: sender code --- include/libs/sync/syncTools.h | 4 +- source/libs/sync/inc/syncSnapshot.h | 18 +++--- source/libs/sync/src/syncMain.c | 46 +++++++++++++++ source/libs/sync/src/syncSnapshot.c | 89 +++++++++++++++++++++-------- 4 files changed, 122 insertions(+), 35 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index e619f50c90..f8d5768b5d 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -394,8 +394,8 @@ typedef struct SyncSnapshotSend { SRaftId destId; SyncTerm term; - SyncIndex lastIndex; - SyncTerm lastTerm; + SyncIndex lastIndex; // lastIndex of snapshot + SyncTerm lastTerm; // lastTerm of snapshot int32_t seq; uint32_t dataLen; char data[]; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index e1a252eed4..281e30f44e 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -33,12 +33,14 @@ extern "C" { #define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF +#define SYNC_SNAPSHOT_RETRY_MS 5000 + typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; @@ -52,15 +54,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); -cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; - void * pCurrentBlock; + void *pWriter; + void *pCurrentBlock; int32_t blockLen; SyncTerm term; @@ -72,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b26fda446e..9ccd98f557 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -29,6 +29,7 @@ #include "syncRequestVote.h" #include "syncRequestVoteReply.h" #include "syncRespMgr.h" +#include "syncSnapshot.h" #include "syncTimeout.h" #include "syncUtil.h" #include "syncVoteMgr.h" @@ -583,6 +584,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // } // tsem_init(&(pSyncNode->restoreSem), 0, 0); + // snapshot senders + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); + // ASSERT(pSender != NULL); + (pSyncNode->senders)[i] = pSender; + } + + // snapshot receivers + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, i); + // ASSERT(pReceiver != NULL); + (pSyncNode->receivers)[i] = pReceiver; + } + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -674,6 +689,20 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + if ((pSyncNode->senders)[i] != NULL) { + snapshotSenderDestroy((pSyncNode->senders)[i]); + (pSyncNode->senders)[i] = NULL; + } + } + + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + if ((pSyncNode->receivers)[i] != NULL) { + snapshotReceiverDestroy((pSyncNode->receivers)[i]); + (pSyncNode->receivers)[i] = NULL; + } + } + /* if (pSyncNode->pSnapshot != NULL) { taosMemoryFree(pSyncNode->pSnapshot); @@ -969,6 +998,23 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); + + // restoreFinish + cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish); + + // snapshot senders + cJSON* pSenders = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "senders", pSenders); + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i])); + } + + // snapshot receivers + cJSON* pReceivers = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "receivers", pReceivers); + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + cJSON_AddItemToArray(pReceivers, snapshotReceiver2Json((pSyncNode->receivers)[i])); + } } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 5fc49d9ffa..b9cd5422c9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -20,24 +20,28 @@ static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { - ASSERT(pSyncNode->pFsm->FpSnapshotStartRead != NULL); - ASSERT(pSyncNode->pFsm->FpSnapshotStopRead != NULL); - ASSERT(pSyncNode->pFsm->FpSnapshotDoRead != NULL); + bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && + (pSyncNode->pFsm->FpSnapshotDoRead != NULL); - SSyncSnapshotSender *pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); - ASSERT(pSender != NULL); - memset(pSender, 0, sizeof(*pSender)); - - pSender->start = false; - pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; - pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; - pSender->pReader = NULL; - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; - pSender->sendingMS = 5000; - pSender->pSyncNode = pSyncNode; - pSender->replicaIndex = replicaIndex; - pSender->term = pSyncNode->pRaftStore->currentTerm; + SSyncSnapshotSender *pSender = NULL; + if (condition) { + pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); + ASSERT(pSender != NULL); + memset(pSender, 0, sizeof(*pSender)); + + pSender->start = false; + pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->pReader = NULL; + pSender->pCurrentBlock = NULL; + pSender->blockLen = 0; + pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; + pSender->pSyncNode = pSyncNode; + pSender->replicaIndex = replicaIndex; + pSender->term = pSyncNode->pRaftStore->currentTerm; + } else { + sInfo("snapshotSenderCreate cannot create sender"); + } return pSender; } @@ -48,38 +52,47 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { } } +// begin send snapshot (current term, seq begin) static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + // open snapshot reader + ASSERT(pSender->pReader == NULL); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); ASSERT(ret == 0); + // get current snapshot info pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + // build begin msg SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->seq = pSender->seq; + pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN + // send SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); syncSnapshotSendDestroy(pMsg); } +// when entry in snapshot, start sender void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (!(pSender->start)) { + // start snapshotSenderDoStart(pSender); pSender->start = true; } else { + // already start ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term); - // leader change + // if current term is higher, need start again if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) { // force peer rollback SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); @@ -98,30 +111,40 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { // close reader int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); ASSERT(ret == 0); + pSender->pReader = NULL; // start again snapshotSenderDoStart(pSender); } else { - // do nothing + // current term, do nothing + ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term); } } } void snapshotSenderStop(SSyncSnapshotSender *pSender) { - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); - ASSERT(ret == 0); + if (pSender->pReader != NULL) { + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + ASSERT(ret == 0); + pSender->pReader = NULL; + } if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); + pSender->pCurrentBlock = NULL; pSender->blockLen = 0; } + + pSender->start = false; } -// send msg from seq, seq is already updated +// when sender receiver ack, call this function to send msg from seq +// seq = ack + 1, already updated int32_t snapshotSend(SSyncSnapshotSender *pSender) { // free memory last time (seq - 1) if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); + pSender->pCurrentBlock = NULL; pSender->blockLen = 0; } @@ -129,6 +152,12 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &(pSender->pCurrentBlock), &(pSender->blockLen)); ASSERT(ret == 0); + if (pSender->blockLen > 0) { + // has read data + } else { + // read finish + pSender->seq = SYNC_SNAPSHOT_SEQ_END; + } SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); pMsg->srcId = pSender->pSyncNode->myRaftId; @@ -347,8 +376,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; } -// sender do something +// sender receives ack, set seq = ack + 1, send msg from seq +// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { + // get sender SSyncSnapshotSender *pSender = NULL; for (int i = 0; i < pSyncNode->replicaNum; ++i) { if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { @@ -360,10 +391,18 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { + // receiver ack is finish, close sender + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { + snapshotSenderStop(pSender); + return 0; + } + + // send next msg if (pMsg->ack == pSender->seq) { + // update sender ack pSender->ack = pMsg->ack; - snapshotSend(pSender); (pSender->seq)++; + snapshotSend(pSender); } } } -- GitLab