未验证 提交 973fc99f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18094 from taosdata/feature/3.0_mhli

refactor(sync): pre snapshot
......@@ -38,6 +38,7 @@ extern "C" {
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION 100
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
......
......@@ -282,8 +282,9 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId);
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
......
......@@ -51,7 +51,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
#ifdef __cplusplus
}
......
......@@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender {
int64_t sendingMS;
SyncTerm term;
int64_t startTime;
int64_t endTime;
bool finish;
// init when create
......@@ -59,14 +60,17 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
// update when pre snapshot
bool start;
int32_t ack;
SyncTerm term;
SRaftId fromId;
int64_t startTime;
// update when begin
void *pWriter;
SyncTerm term;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SRaftId fromId;
int64_t startTime;
// init when create
SSyncNode *pSyncNode;
......@@ -82,7 +86,8 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg);
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
// start
#ifdef __cplusplus
}
......
......@@ -185,7 +185,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->prevLogIndex >= startIndex) {
SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1);
ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
// ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
if (myPreLogTerm == SYNC_TERM_INVALID) {
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid");
goto _SEND_RESPONSE;
}
if (myPreLogTerm != pMsg->prevLogTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match");
......
......@@ -119,7 +119,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(pState != NULL);
if (pMsg->lastSendIndex == pState->lastSendIndex) {
syncNodeReplicateOne(ths, &(pMsg->srcId));
syncNodeReplicateOne(ths, &(pMsg->srcId), true);
}
}
......
......@@ -2604,10 +2604,47 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq,
pMsg->dataLen, s);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}
sNTrace(pSyncNode,
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}
sNTrace(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
......@@ -48,19 +48,20 @@ static int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* d
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) {
// next index
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
// maybe start snapshot
SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) {
sNTrace(pSyncNode, "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex,
logStartIndex, logEndIndex);
// start snapshot
// int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
return 0;
if (snapshot) {
// maybe start snapshot
SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) {
sNTrace(pSyncNode, "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex,
logStartIndex, logEndIndex);
// start snapshot
int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
}
}
// pre index, pre term
......@@ -124,7 +125,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
ret = syncNodeReplicateOne(pSyncNode, pDestId);
ret = syncNodeReplicateOne(pSyncNode, pDestId, true);
if (ret != 0) {
char host[64];
int16_t port;
......
......@@ -19,6 +19,7 @@
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncUtil.h"
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
......@@ -44,6 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->startTime = 0;
pSender->endTime = 0;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
} else {
......@@ -119,6 +121,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
syncSnapshotSendDestroy(pMsg);
// event log
......@@ -130,6 +133,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// update flag
pSender->start = false;
pSender->finish = finish;
pSender->endTime = taosGetTimestampMs();
// close reader
if (pSender->pReader != NULL) {
......@@ -191,6 +195,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
syncSnapshotSendDestroy(pMsg);
// event log
......@@ -226,6 +231,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
syncSnapshotSendDestroy(pMsg);
// event log
......@@ -241,6 +247,9 @@ static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnaps
++(pSender->seq);
}
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
sNTrace(pSyncNode, "starting snapshot ...");
......@@ -253,11 +262,14 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
int32_t code = 0;
if (snapshotSenderIsStart(pSender)) {
code = snapshotSenderStop(pSender, false);
if (code != 0) {
sNError(pSyncNode, "snapshot sender stop error");
return -1;
}
sNTrace(pSyncNode, "snapshot sender already start, ignore");
return 0;
}
if (!snapshotSenderIsStart(pSender) && pSender->finish &&
taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sNTrace(pSyncNode, "snapshot sender too frequently, ignore");
return 1;
}
code = snapshotSenderStart(pSender);
......@@ -316,36 +328,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// static do start by privateTerm, pBeginMsg
// receive first snapshot data
// write first block data
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
// start writer
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
&(pReceiver->snapshotParam), &(pReceiver->pWriter));
ASSERT(ret == 0);
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
pReceiver->fromId = pBeginMsg->srcId;
// update snapshot
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
pReceiver->startTime = pBeginMsg->startTime;
// event log
sRTrace(pReceiver, "snapshot receiver start");
}
// force stop
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
......@@ -362,10 +344,43 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
sRTrace(pReceiver, "snapshot receiver force stop");
}
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
ASSERT(snapshotReceiverIsStart(pReceiver));
// update ack
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
// update snapshot
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
// start writer
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
&(pReceiver->snapshotParam), &(pReceiver->pWriter));
ASSERT(ret == 0);
// event log
sRTrace(pReceiver, "snapshot receiver start writer");
return 0;
}
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
ASSERT(!snapshotReceiverIsStart(pReceiver));
snapshotReceiverDoStart(pReceiver, pBeginMsg);
pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
// event log
sRTrace(pReceiver, "snapshot receiver start");
return 0;
}
......@@ -518,7 +533,10 @@ _START_RECEIVER:
return -1;
} else {
// waiting for clock match
while (taosGetTimestampMs() > pMsg->startTime) {
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
}
......@@ -543,6 +561,7 @@ _SEND_REPLY:
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncSnapshotRspDestroy(pRspMsg);
return 0;
......@@ -552,54 +571,119 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
// condition 1
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
if (snapshotReceiverIsStart(pReceiver)) {
if (pMsg->startTime > pReceiver->startTime) {
snapshotReceiverStop(pReceiver);
} else if (pMsg->startTime == pReceiver->startTime) {
return 0;
} else {
// ignore
sNTrace(pSyncNode, "msg ignore");
return 0;
}
if (!snapshotReceiverIsStart(pReceiver)) {
sNError(pSyncNode, "snapshot receiver not start");
return -1;
}
_START_RECEIVER:
if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sNError(pSyncNode, "snapshot receiver time skew too much");
if (pReceiver->startTime != pMsg->startTime) {
sNError(pSyncNode, "snapshot receiver time not equal");
return -1;
} else {
// waiting for clock match
while (taosGetTimestampMs() > pMsg->startTime) {
taosMsleep(10);
}
}
snapshotReceiverStart(pReceiver, pMsg);
// start writer
snapshotReceiverStartWriter(pReceiver, pMsg);
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncSnapshotRspDestroy(pRspMsg);
// send msg
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncSnapshotRspDestroy(pRspMsg);
return 0;
}
static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 4
// transfering
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver transfering waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
}
if (pMsg->seq == pReceiver->ack + 1) {
snapshotReceiverGotData(pReceiver, pMsg);
}
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncSnapshotRspDestroy(pRspMsg);
return 0;
}
static int32_t syncNodeOnSnapshotTransfer(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; }
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 2
// end, finish FSM
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
}
int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
if (code == 0) {
snapshotReceiverStop(pReceiver);
}
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncSnapshotRspDestroy(pRspMsg);
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; }
return 0;
}
// receiver on message
//
......@@ -641,6 +725,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
int32_t code = 0;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "");
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
......@@ -651,39 +737,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// condition 2
// end, finish FSM
code = snapshotReceiverFinish(pReceiver, pMsg);
if (code == 0) {
snapshotReceiverStop(pReceiver);
}
bool needRsp = true;
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
// update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig;
syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
// do config change
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
}
syncNodeOnSnapshotEnd(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// condition 3
// force close
// force close, no response
snapshotReceiverForceStop(pReceiver);
bool needRsp = false;
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// condition 4
// transfering
if (pMsg->seq == pReceiver->ack + 1) {
snapshotReceiverGotData(pReceiver, pMsg);
}
bool needRsp = true;
syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
} else {
// error log
......@@ -717,11 +778,17 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
pSender->snapshotParam.start = pMsg->snapBeginIndex;
pSender->snapshotParam.end = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
sNError(pSyncNode, "snapshot last index too small");
return -1;
}
// update sender
pSender->snapshot = snapshot;
// start reader
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
if (code != 0) {
......@@ -729,6 +796,12 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
return -1;
}
// update next index
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1);
// update seq
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// build begin msg
SyncSnapshotSend *pSendMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
......@@ -746,6 +819,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
SRpcMsg rpcMsg;
syncSnapshotSend2RpcMsg(pSendMsg, &rpcMsg);
syncNodeSendMsgById(&(pSendMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "");
syncSnapshotSendDestroy(pSendMsg);
return 0;
......@@ -773,6 +847,8 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
return -1;
}
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "");
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
......@@ -782,9 +858,20 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
return 0;
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
snapshotSenderUpdateProgress(pSender, pMsg);
snapshotSend(pSender);
return 0;
}
// receive ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
snapshotSenderStop(pSender, true);
// update next-index
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), pMsg->lastIndex + 1);
syncNodeReplicateOne(pSyncNode, &(pMsg->srcId), false);
return 0;
}
......
......@@ -60,13 +60,13 @@ void syncRespMgrGetTest(uint64_t i) {
void syncRespMgrGetAndDelTest(uint64_t i) {
printf("------syncRespMgrGetAndDelTest-------%" PRIu64 "-- \n", i);
// SRespStub stub;
// int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub);
// if (ret == 1) {
// printStub(&stub);
// } else if (ret == 0) {
// printf("%" PRId64 " notFound \n", i);
// }
SRpcHandleInfo stub;
int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub);
if (ret == 1) {
//printStub(&stub);
} else if (ret == 0) {
printf("%" PRId64 " notFound \n", i);
}
}
SSyncNode *createSyncNode() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册