From f924c1b879f8c38013e31de19e124e96c1d82b4d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Mar 2022 20:43:35 +0800 Subject: [PATCH] sync index --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/inc/syncMessage.h | 3 +- source/libs/sync/src/syncAppendEntries.c | 7 ++- source/libs/sync/src/syncAppendEntriesReply.c | 45 ++++++++++++++++++- source/libs/sync/src/syncElection.c | 2 +- source/libs/sync/src/syncMain.c | 2 + source/libs/sync/src/syncMessage.c | 8 ++-- source/libs/sync/src/syncReplication.c | 29 +++++++++--- source/libs/sync/src/syncRequestVote.c | 36 ++++++++++++++- source/libs/sync/src/syncRequestVoteReply.c | 35 ++++++++++++++- source/libs/sync/test/syncRequestVoteTest.cpp | 2 +- source/libs/sync/test/syncRpcMsgTest.cpp | 2 +- 12 files changed, 155 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 4a1557addd..15c719b76e 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -232,6 +232,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // raft vote -------------- void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); +void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 9bb5b6195e..7dfea31f5c 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -196,7 +196,7 @@ typedef struct SyncRequestVote { SRaftId srcId; SRaftId destId; // private data - SyncTerm currentTerm; + SyncTerm term; SyncIndex lastLogIndex; SyncTerm lastLogTerm; } SyncRequestVote; @@ -287,6 +287,7 @@ typedef struct SyncAppendEntriesReply { SRaftId srcId; SRaftId destId; // private data + SyncTerm term; bool success; SyncIndex matchIndex; } SyncAppendEntriesReply; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ba10234a1d..55d369a115 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -80,4 +80,9 @@ // /\ UNCHANGED <> // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {} +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = 0; + syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg); + + return ret; +} diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 0a5120c8dc..61eb4884e2 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,6 +14,12 @@ */ #include "syncAppendEntriesReply.h" +#include "syncIndexMgr.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == @@ -28,4 +34,41 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {} +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; + syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); + + if (pMsg->term < ths->pRaftStore->currentTerm) { + sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); + return ret; + } + + // no need this code, because if I receive reply.term, then I must have sent for that term. + // if (pMsg->term > ths->pRaftStore->currentTerm) { + // syncNodeUpdateTerm(ths, pMsg->term); + // } + + assert(pMsg->term == ths->pRaftStore->currentTerm); + + if (pMsg->success) { + // nextIndex = reply.matchIndex + 1 + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + + // matchIndex = reply.matchIndex + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + + // maybe commit + syncNodeMaybeAdvanceCommitIndex(ths); + + } else { + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + if (nextIndex > SYNC_INDEX_BEGIN) { + --nextIndex; + } else { + nextIndex = SYNC_INDEX_BEGIN; + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + } + + return ret; +} diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index d06253c049..77c3d07698 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -38,7 +38,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { SyncRequestVote* pMsg = syncRequestVoteBuild(); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->peersId[i]; - pMsg->currentTerm = pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->lastLogIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5ec9df1e70..aaf6535f40 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -587,6 +587,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { syncRequestVoteReplyDestroy(pMsg); } +void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {} + // for debug -------------- void syncNodePrint(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 1e40df4419..2447ea2218 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -820,8 +820,8 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->currentTerm); - cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); @@ -1264,9 +1264,11 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "success", pMsg->success); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); - cJSON_AddStringToObject(pRoot, "match_index", u64buf); + cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index d4c57630ad..b935943a1d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -17,6 +17,8 @@ #include "syncIndexMgr.h" #include "syncMessage.h" #include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncUtil.h" // TLA+ Spec // AppendEntries(i, j) == @@ -50,16 +52,31 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + SyncIndex preLogIndex = nextIndex - 1; - SyncTerm preLogTerm = 0; - if (preLogIndex >= 0) { + + SyncTerm preLogTerm = 0; + if (preLogIndex >= SYNC_INDEX_BEGIN) { SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); preLogTerm = pPreEntry->term; - } else { - preLogTerm = 0; } - // SyncTerm lastIndex = - // pSyncNode->pLogStore->getLastIndex < nextIndex ? pSyncNode->pLogStore->getLastIndex : nextIndex; + + SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); + assert(nextIndex == lastIndex); + + SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); + assert(pEntry != NULL); + + SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->dataLen = pEntry->bytes; + // add pEntry into msg + + syncNodeAppendEntries(pSyncNode, pDestId, pMsg); } return ret; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 354c559a90..be4f40aaad 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -14,6 +14,10 @@ */ #include "syncRequestVote.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == @@ -37,4 +41,34 @@ // m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {} +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = 0; + syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg); + + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncNodeUpdateTerm(ths, pMsg->term); + } + assert(pMsg->term <= ths->pRaftStore->currentTerm); + + bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || + ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && + (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); + bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && + ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); + if (grant) { + raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); + } + + SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->voteGranted = grant; + + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncRequestVoteReplyDestroy(pReply); + + return ret; +} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 72223ea83c..7cdeace166 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -14,6 +14,10 @@ */ #include "syncRequestVoteReply.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleRequestVoteResponse(i, j, m) == @@ -32,4 +36,33 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {} +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = 0; + syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); + + if (pMsg->term < ths->pRaftStore->currentTerm) { + sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); + return ret; + } + + // no need this code, because if I receive reply.term, then I must have sent for that term. + // if (pMsg->term > ths->pRaftStore->currentTerm) { + // syncNodeUpdateTerm(ths, pMsg->term); + // } + + assert(pMsg->term == ths->pRaftStore->currentTerm); + + if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { + votesRespondAdd(ths->pVotesRespond, pMsg); + if (pMsg->voteGranted) { + voteGrantedVote(ths->pVotesGranted, pMsg); + if (voteGrantedMajority(ths->pVotesGranted)) { + if (ths->pVotesGranted->toLeader) { + syncNodeCandidate2Leader(ths); + ths->pVotesGranted->toLeader = true; + } + } + } + } + return ret; +} diff --git a/source/libs/sync/test/syncRequestVoteTest.cpp b/source/libs/sync/test/syncRequestVoteTest.cpp index 7f75ee937b..22f47046de 100644 --- a/source/libs/sync/test/syncRequestVoteTest.cpp +++ b/source/libs/sync/test/syncRequestVoteTest.cpp @@ -20,7 +20,7 @@ SyncRequestVote *createMsg() { pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; - pMsg->currentTerm = 11; + pMsg->term = 11; pMsg->lastLogIndex = 22; pMsg->lastLogTerm = 33; return pMsg; diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp index 61edbd3012..ee6f6c0800 100644 --- a/source/libs/sync/test/syncRpcMsgTest.cpp +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -57,7 +57,7 @@ SyncRequestVote *createSyncRequestVote() { pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; - pMsg->currentTerm = 11; + pMsg->term = 11; pMsg->lastLogIndex = 22; pMsg->lastLogTerm = 33; return pMsg; -- GitLab