提交 71c57d77 编写于 作者: M Minghao Li

refactor: sender code

上级 9988e85f
...@@ -394,8 +394,8 @@ typedef struct SyncSnapshotSend { ...@@ -394,8 +394,8 @@ typedef struct SyncSnapshotSend {
SRaftId destId; SRaftId destId;
SyncTerm term; SyncTerm term;
SyncIndex lastIndex; SyncIndex lastIndex; // lastIndex of snapshot
SyncTerm lastTerm; SyncTerm lastTerm; // lastTerm of snapshot
int32_t seq; int32_t seq;
uint32_t dataLen; uint32_t dataLen;
char data[]; char data[];
......
...@@ -33,12 +33,14 @@ extern "C" { ...@@ -33,12 +33,14 @@ extern "C" {
#define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_RETRY_MS 5000
typedef struct SSyncSnapshotSender { typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void * pReader; void *pReader;
void * pCurrentBlock; void *pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshot snapshot; SSnapshot snapshot;
int64_t sendingMS; int64_t sendingMS;
...@@ -52,15 +54,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender); ...@@ -52,15 +54,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void * pWriter; void *pWriter;
void * pCurrentBlock; void *pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SyncTerm term; SyncTerm term;
...@@ -72,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl ...@@ -72,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "syncRequestVote.h" #include "syncRequestVote.h"
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
#include "syncRespMgr.h" #include "syncRespMgr.h"
#include "syncSnapshot.h"
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
...@@ -583,6 +584,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -583,6 +584,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// } // }
// tsem_init(&(pSyncNode->restoreSem), 0, 0); // tsem_init(&(pSyncNode->restoreSem), 0, 0);
// snapshot senders
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
// ASSERT(pSender != NULL);
(pSyncNode->senders)[i] = pSender;
}
// snapshot receivers
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, i);
// ASSERT(pReceiver != NULL);
(pSyncNode->receivers)[i] = pReceiver;
}
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
// syncNodeBecomeFollower(pSyncNode); // syncNodeBecomeFollower(pSyncNode);
...@@ -674,6 +689,20 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -674,6 +689,20 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pFsm); taosMemoryFree(pSyncNode->pFsm);
} }
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] != NULL) {
snapshotSenderDestroy((pSyncNode->senders)[i]);
(pSyncNode->senders)[i] = NULL;
}
}
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->receivers)[i] != NULL) {
snapshotReceiverDestroy((pSyncNode->receivers)[i]);
(pSyncNode->receivers)[i] = NULL;
}
}
/* /*
if (pSyncNode->pSnapshot != NULL) { if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot); taosMemoryFree(pSyncNode->pSnapshot);
...@@ -969,6 +998,23 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -969,6 +998,23 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
// restoreFinish
cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);
// snapshot senders
cJSON* pSenders = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "senders", pSenders);
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
}
// snapshot receivers
cJSON* pReceivers = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "receivers", pReceivers);
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
cJSON_AddItemToArray(pReceivers, snapshotReceiver2Json((pSyncNode->receivers)[i]));
}
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
......
...@@ -20,11 +20,12 @@ ...@@ -20,11 +20,12 @@
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); static void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
ASSERT(pSyncNode->pFsm->FpSnapshotStartRead != NULL); bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
ASSERT(pSyncNode->pFsm->FpSnapshotStopRead != NULL); (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
ASSERT(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
SSyncSnapshotSender *pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); SSyncSnapshotSender *pSender = NULL;
if (condition) {
pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
memset(pSender, 0, sizeof(*pSender)); memset(pSender, 0, sizeof(*pSender));
...@@ -34,10 +35,13 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -34,10 +35,13 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pReader = NULL; pSender->pReader = NULL;
pSender->pCurrentBlock = NULL; pSender->pCurrentBlock = NULL;
pSender->blockLen = 0; pSender->blockLen = 0;
pSender->sendingMS = 5000; pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->pSyncNode = pSyncNode; pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex; pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->term = pSyncNode->pRaftStore->currentTerm;
} else {
sInfo("snapshotSenderCreate cannot create sender");
}
return pSender; return pSender;
} }
...@@ -48,38 +52,47 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { ...@@ -48,38 +52,47 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
} }
} }
// begin send snapshot (current term, seq begin)
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
// open snapshot reader
ASSERT(pSender->pReader == NULL);
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
ASSERT(ret == 0); ASSERT(ret == 0);
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
// build begin msg
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->seq = pSender->seq; pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN
// send
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncSnapshotSendDestroy(pMsg); syncSnapshotSendDestroy(pMsg);
} }
// when entry in snapshot, start sender
void snapshotSenderStart(SSyncSnapshotSender *pSender) { void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (!(pSender->start)) { if (!(pSender->start)) {
// start
snapshotSenderDoStart(pSender); snapshotSenderDoStart(pSender);
pSender->start = true; pSender->start = true;
} else { } else {
// already start
ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term); ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);
// leader change // if current term is higher, need start again
if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) { if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
// force peer rollback // force peer rollback
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
...@@ -98,30 +111,40 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -98,30 +111,40 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
// close reader // close reader
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
ASSERT(ret == 0); ASSERT(ret == 0);
pSender->pReader = NULL;
// start again // start again
snapshotSenderDoStart(pSender); snapshotSenderDoStart(pSender);
} else { } else {
// do nothing // current term, do nothing
ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
} }
} }
} }
void snapshotSenderStop(SSyncSnapshotSender *pSender) { void snapshotSenderStop(SSyncSnapshotSender *pSender) {
if (pSender->pReader != NULL) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
ASSERT(ret == 0); ASSERT(ret == 0);
pSender->pReader = NULL;
}
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0; pSender->blockLen = 0;
} }
pSender->start = false;
} }
// send msg from seq, seq is already updated // when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
int32_t snapshotSend(SSyncSnapshotSender *pSender) { int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// free memory last time (seq - 1) // free memory last time (seq - 1)
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0; pSender->blockLen = 0;
} }
...@@ -129,6 +152,12 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -129,6 +152,12 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&(pSender->pCurrentBlock), &(pSender->blockLen)); &(pSender->pCurrentBlock), &(pSender->blockLen));
ASSERT(ret == 0); ASSERT(ret == 0);
if (pSender->blockLen > 0) {
// has read data
} else {
// read finish
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
}
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
...@@ -347,8 +376,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -347,8 +376,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
return 0; return 0;
} }
// sender do something // sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// get sender
SSyncSnapshotSender *pSender = NULL; SSyncSnapshotSender *pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) { for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) {
...@@ -360,10 +391,18 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { ...@@ -360,10 +391,18 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
snapshotSenderStop(pSender);
return 0;
}
// send next msg
if (pMsg->ack == pSender->seq) { if (pMsg->ack == pSender->seq) {
// update sender ack
pSender->ack = pMsg->ack; pSender->ack = pMsg->ack;
snapshotSend(pSender);
(pSender->seq)++; (pSender->seq)++;
snapshotSend(pSender);
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册