提交 5f0d2342 编写于 作者: M Minghao Li

refactor: snapshotReSend

上级 060f8224
...@@ -370,50 +370,52 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -370,50 +370,52 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// begin if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
snapshotReceiverStart(pReceiver); // begin
pReceiver->ack = pMsg->seq; snapshotReceiverStart(pReceiver);
needRsp = true; pReceiver->ack = pMsg->seq;
needRsp = true;
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); // end, finish FSM
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
snapshotReceiverStop(pReceiver);
pReceiver->ack = pMsg->seq;
needRsp = true;
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
snapshotReceiverStop(pReceiver);
needRsp = false;
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering
if (pMsg->seq == pReceiver->ack + 1) {
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
snapshotReceiverStop(pReceiver);
pReceiver->ack = pMsg->seq; pReceiver->ack = pMsg->seq;
} needRsp = true;
needRsp = true;
} else { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
ASSERT(0); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
} snapshotReceiverStop(pReceiver);
needRsp = false;
if (needRsp) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); // transfering
pRspMsg->srcId = pSyncNode->myRaftId; if (pMsg->seq == pReceiver->ack + 1) {
pRspMsg->destId = pMsg->srcId; pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pReceiver->ack = pMsg->seq;
pRspMsg->lastIndex = pMsg->lastIndex; }
pRspMsg->lastTerm = pMsg->lastTerm; needRsp = true;
pRspMsg->ack = pReceiver->ack;
SRpcMsg rpcMsg; } else {
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); ASSERT(0);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); }
syncSnapshotRspDestroy(pRspMsg);
if (needRsp) {
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack;
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncSnapshotRspDestroy(pRspMsg);
}
} }
} }
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册