提交 f924c1b8 编写于 作者: M Minghao Li

sync index

上级 7c868aea
......@@ -232,6 +232,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
......
......@@ -196,7 +196,7 @@ typedef struct SyncRequestVote {
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm currentTerm;
SyncTerm term;
SyncIndex lastLogIndex;
SyncTerm lastLogTerm;
} SyncRequestVote;
......@@ -287,6 +287,7 @@ typedef struct SyncAppendEntriesReply {
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm term;
bool success;
SyncIndex matchIndex;
} SyncAppendEntriesReply;
......
......@@ -80,4 +80,9 @@
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<candidateVars, leaderVars>>
//
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {}
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg);
return ret;
}
......@@ -14,6 +14,12 @@
*/
#include "syncAppendEntriesReply.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
......@@ -28,4 +34,41 @@
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {}
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm);
return ret;
}
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
assert(pMsg->term == ths->pRaftStore->currentTerm);
if (pMsg->success) {
// nextIndex = reply.matchIndex + 1
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
// matchIndex = reply.matchIndex
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
// maybe commit
syncNodeMaybeAdvanceCommitIndex(ths);
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
} else {
nextIndex = SYNC_INDEX_BEGIN;
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
return ret;
}
......@@ -38,7 +38,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
SyncRequestVote* pMsg = syncRequestVoteBuild();
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->peersId[i];
pMsg->currentTerm = pSyncNode->pRaftStore->currentTerm;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->lastLogIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore);
pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore);
......
......@@ -587,6 +587,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
syncRequestVoteReplyDestroy(pMsg);
}
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {}
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
......
......@@ -820,8 +820,8 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->currentTerm);
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex);
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm);
......@@ -1264,9 +1264,11 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex);
cJSON_AddStringToObject(pRoot, "match_index", u64buf);
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot);
......
......@@ -17,6 +17,8 @@
#include "syncIndexMgr.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncUtil.h"
// TLA+ Spec
// AppendEntries(i, j) ==
......@@ -50,16 +52,31 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex = nextIndex - 1;
SyncTerm preLogTerm = 0;
if (preLogIndex >= 0) {
if (preLogIndex >= SYNC_INDEX_BEGIN) {
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
preLogTerm = pPreEntry->term;
} else {
preLogTerm = 0;
}
// SyncTerm lastIndex =
// pSyncNode->pLogStore->getLastIndex < nextIndex ? pSyncNode->pLogStore->getLastIndex : nextIndex;
SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
assert(nextIndex == lastIndex);
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
assert(pEntry != NULL);
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
pMsg->dataLen = pEntry->bytes;
// add pEntry into msg
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
}
return ret;
......
......@@ -14,6 +14,10 @@
*/
#include "syncRequestVote.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
......@@ -37,4 +41,34 @@
// m)
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
//
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {}
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
assert(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
}
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->voteGranted = grant;
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return ret;
}
......@@ -14,6 +14,10 @@
*/
#include "syncRequestVoteReply.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleRequestVoteResponse(i, j, m) ==
......@@ -32,4 +36,33 @@
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
//
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {}
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm);
return ret;
}
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
assert(pMsg->term == ths->pRaftStore->currentTerm);
if (ths->state == TAOS_SYNC_STATE_CANDIDATE) {
votesRespondAdd(ths->pVotesRespond, pMsg);
if (pMsg->voteGranted) {
voteGrantedVote(ths->pVotesGranted, pMsg);
if (voteGrantedMajority(ths->pVotesGranted)) {
if (ths->pVotesGranted->toLeader) {
syncNodeCandidate2Leader(ths);
ths->pVotesGranted->toLeader = true;
}
}
}
}
return ret;
}
......@@ -20,7 +20,7 @@ SyncRequestVote *createMsg() {
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 11;
pMsg->term = 11;
pMsg->lastLogIndex = 22;
pMsg->lastLogTerm = 33;
return pMsg;
......
......@@ -57,7 +57,7 @@ SyncRequestVote *createSyncRequestVote() {
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 11;
pMsg->term = 11;
pMsg->lastLogIndex = 22;
pMsg->lastLogTerm = 33;
return pMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册