提交 6b329f79 编写于 作者: M Minghao Li

enh(sync): update raft core functions

上级 af27303a
......@@ -152,11 +152,9 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr;
// restore state
// sem_t restoreSem;
bool restoreFinish;
// SSnapshot* pSnapshot;
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* pNewNodeReceiver;
} SSyncNode;
......
......@@ -19,6 +19,7 @@
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
......@@ -137,7 +138,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
// maybe commit
syncMaybeAdvanceCommitIndex(ths);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncMaybeAdvanceCommitIndex(ths);
}
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
......@@ -151,8 +154,26 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (nextIndex <= snapshot.lastApplyIndex) {
ASSERT(nextIndex == snapshot.lastApplyIndex);
nextIndex = snapshot.lastApplyIndex + 1;
sInfo("reset new nextIndex %ld, snapshot.lastApplyIndex:%ld", nextIndex, snapshot.lastApplyIndex);
// start send snapshot
// get sender
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < ths->replicaNum; ++i) {
if (syncUtilSameId(&(pMsg->srcId), &((ths->replicasId)[i]))) {
pSender = (ths->senders)[i];
}
}
ASSERT(pSender != NULL);
if (!(pSender->term == ths->pRaftStore->currentTerm && pSender->finish == true)) {
snapshotSenderStart(pSender);
} else {
sInfo("snapshot send finish, send_term:%lu, current_term:%lu", pSender->term, ths->pRaftStore->currentTerm);
}
}
}
......
......@@ -598,12 +598,6 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
// snapshot receivers
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, i);
// ASSERT(pReceiver != NULL);
(pSyncNode->receivers)[i] = pReceiver;
}
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100);
// start in syncNodeStart
......@@ -705,13 +699,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
}
}
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->receivers)[i] != NULL) {
snapshotReceiverDestroy((pSyncNode->receivers)[i]);
(pSyncNode->receivers)[i] = NULL;
}
}
if (pSyncNode->pNewNodeReceiver != NULL) {
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
......@@ -1025,10 +1012,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// snapshot receivers
cJSON* pReceivers = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "receivers", pReceivers);
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
cJSON_AddItemToArray(pReceivers, snapshotReceiver2Json((pSyncNode->receivers)[i]));
}
cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
}
cJSON* pJson = cJSON_CreateObject();
......
......@@ -284,7 +284,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -398,7 +398,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -406,19 +406,8 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
// receiver do something
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver
SSyncSnapshotReceiver *pReceiver = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) {
pReceiver = (pSyncNode->receivers)[i];
}
}
// add new replica
if (pReceiver == NULL) {
pReceiver = pSyncNode->pNewNodeReceiver;
}
bool needRsp = false;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
bool needRsp = false;
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册