未验证 提交 f0917eee 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10822 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_COMMIT_H
#define _TD_LIBS_SYNC_COMMIT_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
// \* in part to minimize atomic regions, and in part so that leaders of
// \* single-server clusters are able to mark entries committed.
// AdvanceCommitIndex(i) ==
// /\ state[i] = Leader
// /\ LET \* The set of servers that agree up through index.
// Agree(index) == {i} \cup {k \in Server :
// matchIndex[i][k] >= index}
// \* The maximum indexes for which a quorum agrees
// agreeIndexes == {index \in 1..Len(log[i]) :
// Agree(index) \in Quorum}
// \* New value for commitIndex'[i]
// newCommitIndex ==
// IF /\ agreeIndexes /= {}
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
// THEN
// Max(agreeIndexes)
// ELSE
// commitIndex[i]
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_COMMIT_H*/
...@@ -236,7 +236,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); ...@@ -236,7 +236,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// raft vote -------------- // raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode); void syncNodeVoteForSelf(SSyncNode* pSyncNode);
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
......
...@@ -25,6 +25,46 @@ extern "C" { ...@@ -25,6 +25,46 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec
// Receive(m) ==
// LET i == m.mdest
// j == m.msource
// IN \* Any RPC with a newer term causes the recipient to advance
// \* its term first. Responses with stale terms are ignored.
// \/ UpdateTerm(i, j, m)
// \/ /\ m.mtype = RequestVoteRequest
// /\ HandleRequestVoteRequest(i, j, m)
// \/ /\ m.mtype = RequestVoteResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleRequestVoteResponse(i, j, m)
// \/ /\ m.mtype = AppendEntriesRequest
// /\ HandleAppendEntriesRequest(i, j, m)
// \/ /\ m.mtype = AppendEntriesResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleAppendEntriesResponse(i, j, m)
// DuplicateMessage(m) ==
// /\ Send(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// DropMessage(m) ==
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// Next == /\ \/ \E i \in Server : Restart(i)
// \/ \E i \in Server : Timeout(i)
// \/ \E i,j \in Server : RequestVote(i, j)
// \/ \E i \in Server : BecomeLeader(i)
// \/ \E i \in Server, v \in Value : ClientRequest(i, v)
// \/ \E i \in Server : AdvanceCommitIndex(i)
// \/ \E i,j \in Server : AppendEntries(i, j)
// \/ \E m \in DOMAIN messages : Receive(m)
// \/ \E m \in DOMAIN messages : DuplicateMessage(m)
// \/ \E m \in DOMAIN messages : DropMessage(m)
// \* History variable that tracks every log ever:
// /\ allLogs' = allLogs \cup {log[i] : i \in Server}
//
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -94,6 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -94,6 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
assert(pMsg->term <= ths->pRaftStore->currentTerm); assert(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) { if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId; ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
...@@ -135,38 +136,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -135,38 +136,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncNodeBecomeFollower(ths); syncNodeBecomeFollower(ths);
// need ret? // ret or reply?
return ret; return ret;
} }
// accept request // accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
bool matchSuccess = false; bool preMatch = false;
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
matchSuccess = true; preMatch = true;
} }
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pPreEntry != NULL); assert(pPreEntry != NULL);
if (pMsg->prevLogTerm == pPreEntry->term) { if (pMsg->prevLogTerm == pPreEntry->term) {
matchSuccess = true; preMatch = true;
} }
syncEntryDestory(pPreEntry); syncEntryDestory(pPreEntry);
} }
if (matchSuccess) { if (preMatch) {
// delete conflict entries // must has preIndex in local log
if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) { assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
ths->pLogStore->truncate(ths->pLogStore, fromIndex); bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
} bool hasAppendEntries = pMsg->dataLen > 0;
if (hasExtraEntries && hasAppendEntries) {
// conflict
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term == pAppendEntry->term) {
conflict = true;
}
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0);
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
// append one entry } else if (hasExtraEntries && !hasAppendEntries) {
if (pMsg->dataLen > 0) { // do nothing
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ths->pLogStore->appendEntry(ths->pLogStore, pEntry); } else if (!hasExtraEntries && hasAppendEntries) {
syncEntryDestory(pEntry); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
} }
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
...@@ -175,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -175,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true; pReply->success = true;
if (pMsg->dataLen > 0) { if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1; pReply->matchIndex = pMsg->prevLogIndex + 1;
} else { } else {
pReply->matchIndex = pMsg->prevLogIndex; pReply->matchIndex = pMsg->prevLogIndex;
...@@ -201,11 +281,38 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -201,11 +281,38 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncAppendEntriesReplyDestroy(pReply); syncAppendEntriesReplyDestroy(pReply);
} }
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) { if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
// commit SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex; ths->commitIndex = pMsg->commitIndex;
// call back Wal
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
}
} }
} }
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
#include "syncCommit.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
...@@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
// maybe commit // maybe commit
syncNodeMaybeAdvanceCommitIndex(ths); syncMaybeAdvanceCommitIndex(ths);
} else { } else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
......
...@@ -13,9 +13,11 @@ ...@@ -13,9 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "syncCommit.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h"
// \* Leader i advances its commitIndex. // \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses, // \* This is done as a separate step from handling AppendEntries responses,
...@@ -40,30 +42,80 @@ ...@@ -40,30 +42,80 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>> // /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
// //
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
// update commit index // update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
++index) {
if (syncAgree(pSyncNode, index)) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
assert(pEntry != NULL);
if (pSyncNode->pFsm != NULL) { // cannot commit, even if quorum agree. need check term!
SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
SyncIndex endIndex = SYNC_INDEX_INVALID; // update commit index
for (SyncIndex i = beginIndex; i <= endIndex; ++i) { newCommitIndex = index;
if (i != SYNC_INDEX_INVALID) { break;
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); }
assert(pEntry != NULL); }
}
SRpcMsg rpcMsg; if (newCommitIndex > pSyncNode->commitIndex) {
syncEntry2OriginalRpc(pEntry, &rpcMsg); SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
if (pSyncNode->pFsm->FpCommitCb != NULL) { // update commit index
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); pSyncNode->commitIndex = newCommitIndex;
}
// call back Wal
pSyncNode->pLogStore->updateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
// execute fsm
if (pSyncNode->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
}
} }
} }
} }
}
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
// b for debug
bool b = false;
if (matchIndex >= index) {
b = true;
}
return b;
}
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
++agreeCount;
}
if (agreeCount >= pSyncNode->quorum) {
return true;
}
}
return false;
} }
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "sync.h" #include "sync.h"
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
#include "syncCommit.h"
#include "syncElection.h" #include "syncElection.h"
#include "syncEnv.h" #include "syncEnv.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
...@@ -150,6 +151,30 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -150,6 +151,30 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init life cycle // init life cycle
// TLA+ Spec
// InitHistoryVars == /\ elections = {}
// /\ allLogs = {}
// /\ voterLog = [i \in Server |-> [j \in {} |-> <<>>]]
// InitServerVars == /\ currentTerm = [i \in Server |-> 1]
// /\ state = [i \in Server |-> Follower]
// /\ votedFor = [i \in Server |-> Nil]
// InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
// /\ votesGranted = [i \in Server |-> {}]
// \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
// \* leader does not send itself messages. It's still easier to include these
// \* in the functions.
// InitLeaderVars == /\ nextIndex = [i \in Server |-> [j \in Server |-> 1]]
// /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
// InitLogVars == /\ log = [i \in Server |-> << >>]
// /\ commitIndex = [i \in Server |-> 0]
// Init == /\ messages = [m \in {} |-> 0]
// /\ InitHistoryVars
// /\ InitServerVars
// /\ InitCandidateVars
// /\ InitLeaderVars
// /\ InitLogVars
//
// init TLA+ server vars // init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
...@@ -727,6 +752,16 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { ...@@ -727,6 +752,16 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret; return ret;
} }
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
// /\ LET entry == [term |-> currentTerm[i],
// value |-> v]
// newLog == Append(log[i], entry)
// IN log' = [log EXCEPT ![i] = newLog]
// /\ UNCHANGED <<messages, serverVars, candidateVars,
// leaderVars, commitIndex>>
//
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
...@@ -740,7 +775,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -740,7 +775,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
ths->pLogStore->appendEntry(ths->pLogStore, pEntry); ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// only myself, maybe commit // only myself, maybe commit
syncNodeMaybeAdvanceCommitIndex(ths); syncMaybeAdvanceCommitIndex(ths);
// start replicate right now! // start replicate right now!
syncNodeReplicate(ths); syncNodeReplicate(ths);
......
...@@ -14,3 +14,43 @@ ...@@ -14,3 +14,43 @@
*/ */
#include "syncOnMessage.h" #include "syncOnMessage.h"
// TLA+ Spec
// Receive(m) ==
// LET i == m.mdest
// j == m.msource
// IN \* Any RPC with a newer term causes the recipient to advance
// \* its term first. Responses with stale terms are ignored.
// \/ UpdateTerm(i, j, m)
// \/ /\ m.mtype = RequestVoteRequest
// /\ HandleRequestVoteRequest(i, j, m)
// \/ /\ m.mtype = RequestVoteResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleRequestVoteResponse(i, j, m)
// \/ /\ m.mtype = AppendEntriesRequest
// /\ HandleAppendEntriesRequest(i, j, m)
// \/ /\ m.mtype = AppendEntriesResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleAppendEntriesResponse(i, j, m)
// DuplicateMessage(m) ==
// /\ Send(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// DropMessage(m) ==
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// Next == /\ \/ \E i \in Server : Restart(i)
// \/ \E i \in Server : Timeout(i)
// \/ \E i,j \in Server : RequestVote(i, j)
// \/ \E i \in Server : BecomeLeader(i)
// \/ \E i \in Server, v \in Value : ClientRequest(i, v)
// \/ \E i \in Server : AdvanceCommitIndex(i)
// \/ \E i,j \in Server : AppendEntries(i, j)
// \/ \E m \in DOMAIN messages : Receive(m)
// \/ \E m \in DOMAIN messages : DuplicateMessage(m)
// \/ \E m \in DOMAIN messages : DropMessage(m)
// \* History variable that tracks every log ever:
// /\ allLogs' = allLogs \cup {log[i] : i \in Server}
//
\ No newline at end of file
...@@ -162,7 +162,7 @@ int main(int argc, char **argv) { ...@@ -162,7 +162,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pMsg1 = step1(pMsg0); SyncClientRequest *pMsg1 = step1(pMsg0);
syncClientRequestPrint2((char *)"==step1==", pMsg1); syncClientRequestPrint2((char *)"==step1==", pMsg1);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 10; ++i) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册