提交 29b97fa0 编写于 作者: M Minghao Li

refactor(sync): syncNodeOnAppendEntriesReplySnapshotCb

上级 aa107d03
......@@ -398,7 +398,7 @@ typedef struct SyncSnapshotSend {
SyncTerm term;
SyncIndex lastIndex; // lastIndex of snapshot
SyncTerm lastTerm; // lastTerm of snapshot
SyncTerm privateTerm;
SyncTerm privateTerm;
int32_t seq;
uint32_t dataLen;
char data[];
......@@ -433,8 +433,9 @@ typedef struct SyncSnapshotRsp {
SyncTerm term;
SyncIndex lastIndex;
SyncTerm lastTerm;
SyncTerm privateTerm;
SyncTerm privateTerm;
int32_t ack;
int32_t code;
} SyncSnapshotRsp;
SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId);
......
......@@ -48,7 +48,7 @@ typedef struct SSyncSnapshotSender {
int32_t replicaIndex;
SyncTerm term;
SyncTerm privateTerm;
bool apply;
bool finish;
} SSyncSnapshotSender;
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
......@@ -75,9 +75,9 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
......
......@@ -101,7 +101,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, term:%lu", ths->pRaftStore->currentTerm);
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId,
ths->pRaftStore->currentTerm);
syncAppendEntriesReplyLog2(logBuf, pMsg);
// if already drop replica, do not process
......@@ -145,7 +146,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
sTrace("update next index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
......@@ -157,49 +158,35 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
sTrace("begin to update next index:%ld, success:%d", nextIndex, pMsg->success);
sTrace("update next not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
// has snapshot
if (syncNodeHasSnapshot(ths)) {
// get sender
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < ths->replicaNum; ++i) {
if (syncUtilSameId(&(pMsg->srcId), &((ths->replicasId)[i]))) {
pSender = (ths->senders)[i];
}
}
ASSERT(pSender != NULL);
SyncIndex sentryIndex;
if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
// already start
sentryIndex = pSender->snapshot.lastApplyIndex;
sTrace(
"sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"pSender->privateTerm:%lu",
pSender->term, ths->pRaftStore->currentTerm, pSender->privateTerm);
} else {
if (pMsg->privateTerm >= pSender->privateTerm) {
// donot start again
sentryIndex = pSender->snapshot.lastApplyIndex;
} else {
// start first time
snapshotSenderStart(pSender);
pSender->start = true;
sentryIndex = pSender->snapshot.lastApplyIndex;
}
}
// update nextIndex to sentryIndex + 1
if (nextIndex <= sentryIndex) {
nextIndex = sentryIndex + 1;
}
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
// start sending snapshot first time
// start here, stop by receiver
if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char* s = snapshotSender2Str(pSender);
sInfo("snapshot send, start sender first time, sender:%s", s);
taosMemoryFree(s);
}
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
// update nextIndex to sentryIndex
if (nextIndex <= sentryIndex) {
nextIndex = sentryIndex;
}
} else {
......@@ -207,7 +194,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
sTrace("update next index:%ld, success:%d", nextIndex, pMsg->success);
sTrace("update next not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
......
......@@ -1991,6 +1991,7 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack);
cJSON_AddNumberToObject(pRoot, "code", pMsg->code);
}
cJSON* pJson = cJSON_CreateObject();
......
......@@ -20,8 +20,9 @@
#include "syncUtil.h"
#include "wal.h"
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
......@@ -44,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = taosGetTimestampMs() + 100;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->apply = false;
pSender->finish = false;
} else {
sError("snapshotSenderCreate cannot create sender");
}
......@@ -86,7 +87,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
++(pSender->privateTerm);
pSender->apply = false;
pSender->finish = false;
pSender->start = true;
// build begin msg
......@@ -299,7 +300,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "apply", pSender->apply);
cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
}
cJSON *pJson = cJSON_CreateObject();
......@@ -349,8 +350,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// begin receive snapshot msg (current term, seq begin)
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver) {
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
ASSERT(pReceiver->pWriter == NULL);
......@@ -360,10 +362,10 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver) {
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
if (!snapshotReceiverIsStart(pReceiver)) {
// start
snapshotReceiverDoStart(pReceiver);
snapshotReceiverDoStart(pReceiver, privateTerm);
pReceiver->start = true;
} else {
......@@ -376,7 +378,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {
pReceiver->pWriter = NULL;
// start again
snapshotReceiverDoStart(pReceiver);
snapshotReceiverDoStart(pReceiver, privateTerm);
pReceiver->start = true;
ASSERT(0);
......@@ -387,7 +389,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {
taosMemoryFree(s);
}
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
if (pReceiver->pWriter != NULL) {
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
......@@ -396,7 +398,10 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->start = false;
++(pReceiver->privateTerm);
if (apply) {
++(pReceiver->privateTerm);
}
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStop %s", s);
......@@ -441,15 +446,15 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
bool needRsp = false;
int32_t writeCode = 0;
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin
snapshotReceiverStart(pReceiver);
snapshotReceiverStart(pReceiver, pMsg->privateTerm);
pReceiver->ack = pMsg->seq;
pReceiver->privateTerm = pMsg->privateTerm;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
......@@ -459,7 +464,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
......@@ -472,11 +479,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
taosMemoryFree(logSimpleStr);
// walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
// sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex);
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver);
snapshotReceiverStop(pReceiver, true);
pReceiver->ack = pMsg->seq;
needRsp = true;
......@@ -487,7 +491,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
snapshotReceiverStop(pReceiver);
snapshotReceiverStop(pReceiver, false);
needRsp = false;
char *msgStr = syncSnapshotSend2Str(pMsg);
......@@ -499,7 +503,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering
if (pMsg->seq == pReceiver->ack + 1) {
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
writeCode =
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pReceiver->ack = pMsg->seq;
}
needRsp = true;
......@@ -521,6 +527,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack;
pRspMsg->code = writeCode;
pRspMsg->privateTerm = pReceiver->privateTerm;
SRpcMsg rpcMsg;
......@@ -541,12 +548,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// get sender
SSyncSnapshotSender *pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) {
pSender = (pSyncNode->senders)[i];
}
}
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
ASSERT(pSender != NULL);
// state, term, seq/ack
......@@ -554,12 +556,8 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->apply = true;
pSender->finish = true;
snapshotSenderStop(pSender);
// update nextIndex private term
// syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm);
return 0;
}
......@@ -569,8 +567,10 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
pSender->ack = pMsg->ack;
(pSender->seq)++;
snapshotSend(pSender);
} else if (pMsg->ack == pSender->seq - 1) {
snapshotReSend(pSender);
} else {
ASSERT(0);
}
......
......@@ -25,6 +25,7 @@ SyncSnapshotRsp *createMsg() {
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
pMsg->ack = 44;
pMsg->code = 55;
return pMsg;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册