提交 eba8f2b5 编写于 作者: M Minghao Li

fix(sync) send snapshot multi-times

上级 0e41ab7f
......@@ -41,11 +41,11 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj);
......
......@@ -18,6 +18,7 @@
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "wal.h"
......@@ -576,7 +577,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->privateTerm = pMsg->privateTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
......@@ -661,7 +662,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
pReply->privateTerm = pMsg->privateTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
......
......@@ -175,6 +175,26 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pSender != NULL);
SyncIndex sentryIndex;
if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
// already start
sentryIndex = pSender->snapshot.lastApplyIndex;
sTrace(
"sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"pSender->privateTerm:%lu",
pSender->term, ths->pRaftStore->currentTerm, pSender->privateTerm);
} else {
if (pMsg->privateTerm >= pSender->privateTerm) {
// donot start again
sentryIndex = pSender->snapshot.lastApplyIndex;
} else {
// start first time
snapshotSenderDoStart(pSender);
pSender->start = true;
sentryIndex = pSender->snapshot.lastApplyIndex;
}
}
#if 0
SyncIndex sentryIndex;
if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
......
......@@ -177,7 +177,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);
pMsg->privateTerm = 0;
// pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);
// send msg
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
......
......@@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = 0;
pSender->privateTerm = taosGetTimestampMs() + 100;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
......@@ -62,6 +62,9 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
// begin send snapshot (current term, seq begin)
void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
// when start, increase term
++(pSender->privateTerm);
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
......@@ -82,6 +85,7 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN
pMsg->privateTerm = pSender->privateTerm;
// send
SRpcMsg rpcMsg;
......@@ -92,9 +96,6 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr);
// when start, increase term
++(pSender->privateTerm);
syncSnapshotSendDestroy(pMsg);
}
......@@ -198,6 +199,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->seq = pSender->seq;
pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
SRpcMsg rpcMsg;
......@@ -316,6 +318,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
pReceiver->pSyncNode = pSyncNode;
pReceiver->replicaIndex = replicaIndex;
pReceiver->term = pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = 0;
} else {
sInfo("snapshotReceiverCreate cannot create receiver");
}
......@@ -377,6 +381,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->start = false;
++(pReceiver->privateTerm);
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStop %s", s);
......@@ -429,6 +434,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// begin
snapshotReceiverStart(pReceiver);
pReceiver->ack = pMsg->seq;
pReceiver->privateTerm = pMsg->privateTerm;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
......@@ -490,6 +496,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack;
pRspMsg->privateTerm = pReceiver->privateTerm;
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
......@@ -526,7 +533,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
snapshotSenderStop(pSender);
// update nextIndex private term
syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm);
// syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm);
return 0;
}
......
......@@ -66,9 +66,9 @@ int main(int argc, char** argv) {
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[0], 100);
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[1], 200);
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[2], 300);
syncIndexMgrSetTerm(pSyncIndexMgr, &ids[0], 700);
syncIndexMgrSetTerm(pSyncIndexMgr, &ids[1], 800);
syncIndexMgrSetTerm(pSyncIndexMgr, &ids[2], 900);
//syncIndexMgrSetTerm(pSyncIndexMgr, &ids[0], 700);
//syncIndexMgrSetTerm(pSyncIndexMgr, &ids[1], 800);
//syncIndexMgrSetTerm(pSyncIndexMgr, &ids[2], 900);
{
char* serialized = syncIndexMgr2Str(pSyncIndexMgr);
assert(serialized != NULL);
......@@ -80,8 +80,8 @@ int main(int argc, char** argv) {
printf("---------------------------------------\n");
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
SyncIndex idx = syncIndexMgrGetIndex(pSyncIndexMgr, &ids[i]);
SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]);
printf("%d: index:%ld term:%lu \n", i, idx, term);
//SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]);
//printf("%d: index:%ld term:%lu \n", i, idx, term);
}
printf("---------------------------------------\n");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册