提交 76185e17 编写于 作者: M Minghao Li

refactor(sync): SSyncSnapshotSender

上级 d72d2d69
...@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender { ...@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void * pReader; void *pReader;
void * pCurrentBlock; void *pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshot snapshot; SSnapshot snapshot;
int64_t sendingMS; int64_t sendingMS;
...@@ -48,25 +48,24 @@ typedef struct SSyncSnapshotSender { ...@@ -48,25 +48,24 @@ typedef struct SSyncSnapshotSender {
int32_t replicaIndex; int32_t replicaIndex;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
bool finish; bool apply;
} SSyncSnapshotSender; } SSyncSnapshotSender;
void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
// void snapshotSenderStart(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void * pWriter; void *pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
...@@ -78,8 +77,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl ...@@ -78,8 +77,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
...@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} else { } else {
// start first time // start first time
snapshotSenderDoStart(pSender); snapshotSenderStart(pSender);
pSender->start = true; pSender->start = true;
sentryIndex = pSender->snapshot.lastApplyIndex; sentryIndex = pSender->snapshot.lastApplyIndex;
} }
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "wal.h" #include "wal.h"
// static void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver);
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
...@@ -45,27 +44,28 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -45,27 +44,28 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = taosGetTimestampMs() + 100; pSender->privateTerm = taosGetTimestampMs() + 100;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->apply = false;
pSender->finish = false;
} else { } else {
sInfo("snapshotSenderCreate cannot create sender"); sError("snapshotSenderCreate cannot create sender");
} }
return pSender; return pSender;
} }
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender != NULL) { if (pSender != NULL) {
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
}
taosMemoryFree(pSender); taosMemoryFree(pSender);
} }
} }
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
// begin send snapshot (current term, seq begin) // begin send snapshot (current term, seq begin)
void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { void snapshotSenderStart(SSyncSnapshotSender *pSender) {
// when start, increase term ASSERT(!snapshotSenderIsStart(pSender));
++(pSender->privateTerm);
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
...@@ -74,9 +74,21 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { ...@@ -74,9 +74,21 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
ASSERT(ret == 0); ASSERT(ret == 0);
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
}
pSender->blockLen = 0;
// get current snapshot info // get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
++(pSender->privateTerm);
pSender->apply = false;
pSender->start = true;
// build begin msg // build begin msg
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
...@@ -87,7 +99,7 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { ...@@ -87,7 +99,7 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN
pMsg->privateTerm = pSender->privateTerm; pMsg->privateTerm = pSender->privateTerm;
// send // send msg
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
...@@ -192,6 +204,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -192,6 +204,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pSender->seq = SYNC_SNAPSHOT_SEQ_END; pSender->seq = SYNC_SNAPSHOT_SEQ_END;
} }
// build msg
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
...@@ -202,6 +215,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -202,6 +215,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->privateTerm = pSender->privateTerm; pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
// send msg
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
...@@ -215,7 +229,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -215,7 +229,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
syncSnapshotSendDestroy(pMsg); syncSnapshotSendDestroy(pMsg);
return 0; return 0;
} }
...@@ -286,7 +299,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { ...@@ -286,7 +299,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); cJSON_AddNumberToObject(pRoot, "apply", pSender->apply);
} }
cJSON *pJson = cJSON_CreateObject(); cJSON *pJson = cJSON_CreateObject();
...@@ -296,7 +309,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { ...@@ -296,7 +309,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -416,7 +429,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -416,7 +429,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -448,7 +461,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -448,7 +461,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
char * logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
sInfo( sInfo(
...@@ -539,7 +552,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { ...@@ -539,7 +552,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender // receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->finish = true; pSender->apply = true;
snapshotSenderStop(pSender); snapshotSenderStop(pSender);
// update nextIndex private term // update nextIndex private term
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册