diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 4a1557addd3bf4a533d215b0ee1684f2fe9cfd8d..15c719b76e08d27dda223275811f96433550ba84 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -232,6 +232,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // raft vote -------------- void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); +void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 9bb5b6195e7e238d88ec400dcc8cc086ec202b57..7dfea31f5c1240ca69c09a4757fdf72acd3ba8e1 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -196,7 +196,7 @@ typedef struct SyncRequestVote { SRaftId srcId; SRaftId destId; // private data - SyncTerm currentTerm; + SyncTerm term; SyncIndex lastLogIndex; SyncTerm lastLogTerm; } SyncRequestVote; @@ -287,6 +287,7 @@ typedef struct SyncAppendEntriesReply { SRaftId srcId; SRaftId destId; // private data + SyncTerm term; bool success; SyncIndex matchIndex; } SyncAppendEntriesReply; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ba10234a1d25da702810bd2f31c2d64f67200899..55d369a115dcb33a24b6446b0a24b18da036cba1 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -80,4 +80,9 @@ // /\ UNCHANGED <> // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {} +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = 0; + syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg); + + return ret; +} diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 0a5120c8dc4b87f36adb63eb82c1ba6362744bf2..61eb4884e2b8e5013adcc6012dec72257d98eac9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,6 +14,12 @@ */ #include "syncAppendEntriesReply.h" +#include "syncIndexMgr.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == @@ -28,4 +34,41 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {} +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; + syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); + + if (pMsg->term < ths->pRaftStore->currentTerm) { + sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); + return ret; + } + + // no need this code, because if I receive reply.term, then I must have sent for that term. + // if (pMsg->term > ths->pRaftStore->currentTerm) { + // syncNodeUpdateTerm(ths, pMsg->term); + // } + + assert(pMsg->term == ths->pRaftStore->currentTerm); + + if (pMsg->success) { + // nextIndex = reply.matchIndex + 1 + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + + // matchIndex = reply.matchIndex + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + + // maybe commit + syncNodeMaybeAdvanceCommitIndex(ths); + + } else { + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + if (nextIndex > SYNC_INDEX_BEGIN) { + --nextIndex; + } else { + nextIndex = SYNC_INDEX_BEGIN; + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + } + + return ret; +} diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index d06253c0494be79af5e8ddc29645fa2aae9978a3..77c3d076988762713a2ffe354c1e91ca54b2e8bd 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -38,7 +38,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { SyncRequestVote* pMsg = syncRequestVoteBuild(); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->peersId[i]; - pMsg->currentTerm = pSyncNode->pRaftStore->currentTerm; + pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->lastLogIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5ec9df1e70c47fc947d686ef660738fcf603fe29..aaf6535f402fd1757433ca9253a5b04b28f9da0b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -587,6 +587,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { syncRequestVoteReplyDestroy(pMsg); } +void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {} + // for debug -------------- void syncNodePrint(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 1e40df44193e8c9e67d33e8ef4d15e8e739d2080..2447ea2218b0c0b38dbdbe391f4e9f46208e6194 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -820,8 +820,8 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->currentTerm); - cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); @@ -1264,9 +1264,11 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "success", pMsg->success); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); - cJSON_AddStringToObject(pRoot, "match_index", u64buf); + cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index d4c57630add185cd3ccc6e3598f530d6d382d64a..b935943a1dd7dab3c69d2ae4c923504905ad2b14 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -17,6 +17,8 @@ #include "syncIndexMgr.h" #include "syncMessage.h" #include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncUtil.h" // TLA+ Spec // AppendEntries(i, j) == @@ -50,16 +52,31 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + SyncIndex preLogIndex = nextIndex - 1; - SyncTerm preLogTerm = 0; - if (preLogIndex >= 0) { + + SyncTerm preLogTerm = 0; + if (preLogIndex >= SYNC_INDEX_BEGIN) { SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); preLogTerm = pPreEntry->term; - } else { - preLogTerm = 0; } - // SyncTerm lastIndex = - // pSyncNode->pLogStore->getLastIndex < nextIndex ? pSyncNode->pLogStore->getLastIndex : nextIndex; + + SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); + assert(nextIndex == lastIndex); + + SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); + assert(pEntry != NULL); + + SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->dataLen = pEntry->bytes; + // add pEntry into msg + + syncNodeAppendEntries(pSyncNode, pDestId, pMsg); } return ret; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 354c559a905470a9012b42897c75e88f73b46189..be4f40aaadd1aaa09bf3109341d59b988e63e646 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -14,6 +14,10 @@ */ #include "syncRequestVote.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == @@ -37,4 +41,34 @@ // m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {} +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = 0; + syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg); + + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncNodeUpdateTerm(ths, pMsg->term); + } + assert(pMsg->term <= ths->pRaftStore->currentTerm); + + bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || + ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && + (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); + bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && + ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); + if (grant) { + raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); + } + + SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->voteGranted = grant; + + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncRequestVoteReplyDestroy(pReply); + + return ret; +} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 72223ea83c7abf96be9c6e24c237a340c58c0acf..7cdeace1667577eeb781e1b0908e80405c1a2ded 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -14,6 +14,10 @@ */ #include "syncRequestVoteReply.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleRequestVoteResponse(i, j, m) == @@ -32,4 +36,33 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {} +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = 0; + syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); + + if (pMsg->term < ths->pRaftStore->currentTerm) { + sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); + return ret; + } + + // no need this code, because if I receive reply.term, then I must have sent for that term. + // if (pMsg->term > ths->pRaftStore->currentTerm) { + // syncNodeUpdateTerm(ths, pMsg->term); + // } + + assert(pMsg->term == ths->pRaftStore->currentTerm); + + if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { + votesRespondAdd(ths->pVotesRespond, pMsg); + if (pMsg->voteGranted) { + voteGrantedVote(ths->pVotesGranted, pMsg); + if (voteGrantedMajority(ths->pVotesGranted)) { + if (ths->pVotesGranted->toLeader) { + syncNodeCandidate2Leader(ths); + ths->pVotesGranted->toLeader = true; + } + } + } + } + return ret; +} diff --git a/source/libs/sync/test/syncRequestVoteTest.cpp b/source/libs/sync/test/syncRequestVoteTest.cpp index 7f75ee937b17fc1b36370ef40559c6197a53e77b..22f47046dea3a8d98e00e5b3fa28d89dfdfd7a04 100644 --- a/source/libs/sync/test/syncRequestVoteTest.cpp +++ b/source/libs/sync/test/syncRequestVoteTest.cpp @@ -20,7 +20,7 @@ SyncRequestVote *createMsg() { pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; - pMsg->currentTerm = 11; + pMsg->term = 11; pMsg->lastLogIndex = 22; pMsg->lastLogTerm = 33; return pMsg; diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp index 61edbd30128f6a070069b47c4ba65f51e00d82d9..ee6f6c0800cc594ad3b67e1499451ade21a0a23d 100644 --- a/source/libs/sync/test/syncRpcMsgTest.cpp +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -57,7 +57,7 @@ SyncRequestVote *createSyncRequestVote() { pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; - pMsg->currentTerm = 11; + pMsg->term = 11; pMsg->lastLogIndex = 22; pMsg->lastLogTerm = 33; return pMsg;