diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 0156e695a3ead53bfb2e2e8f5b05d75891c78ca2..35d3046d66e5557785d1179757733a764d323086 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -28,6 +28,71 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleAppendEntriesRequest(i, j, m) == +// LET logOk == \/ m.mprevLogIndex = 0 +// \/ /\ m.mprevLogIndex > 0 +// /\ m.mprevLogIndex <= Len(log[i]) +// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ /\ \* reject request +// \/ m.mterm < currentTerm[i] +// \/ /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ \lnot logOk +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> FALSE, +// mmatchIndex |-> 0, +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* return to follower state +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Candidate +// /\ state' = [state EXCEPT ![i] = Follower] +// /\ UNCHANGED <> +// \/ \* accept request +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ logOk +// /\ LET index == m.mprevLogIndex + 1 +// IN \/ \* already done with request +// /\ \/ m.mentries = << >> +// \/ /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term = m.mentries[1].term +// \* This could make our commitIndex decrease (for +// \* example if we process an old, duplicated request), +// \* but that doesn't really affect anything. +// /\ commitIndex' = [commitIndex EXCEPT ![i] = +// m.mcommitIndex] +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> TRUE, +// mmatchIndex |-> m.mprevLogIndex + +// Len(m.mentries), +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* conflict: remove 1 entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term /= m.mentries[1].term +// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> +// log[i][index2]] +// IN log' = [log EXCEPT ![i] = new] +// /\ UNCHANGED <> +// \/ \* no conflict: append entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) = m.mprevLogIndex +// /\ log' = [log EXCEPT ![i] = +// Append(log[i], m.mentries[1])] +// /\ UNCHANGED <> +// /\ UNCHANGED <> +// int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 7b80172e8dbd6b27664f0b43a4b3b5bb539a5ec5..75b82aa5316fb0e744066ada9d1a3a4c9aba8b9b 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -28,6 +28,19 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleAppendEntriesResponse(i, j, m) == +// /\ m.mterm = currentTerm[i] +// /\ \/ /\ m.msuccess \* successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] +// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] +// \/ /\ \lnot m.msuccess \* not successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = +// Max({nextIndex[i][j] - 1, 1})] +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +// int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index abacfb8093067a2c2a1764e2c4663efbcdc151b1..019c291efc03a59e56fae223717718cf0a416de1 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,10 +26,21 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -int32_t syncNodeElect(SSyncNode* pSyncNode); - +// TLA+ Spec +// RequestVote(i, j) == +// /\ state[i] = Candidate +// /\ j \notin votesResponded[i] +// /\ Send([mtype |-> RequestVoteRequest, +// mterm |-> currentTerm[i], +// mlastLogTerm |-> LastTerm(log[i]), +// mlastLogIndex |-> Len(log[i]), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +// int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); +int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 160fefd086cc7e4a0334e2156883da1ba5034046..a948de8ac16d91d03a1eb1adb95586b94200f861 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; void *ioTimerTickQ; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index bce03059a04e8a29015aa4f18a8c71a2d2a55eb4..b8c7eb60e732d2b678e80dce2b0e67f5dbe5f5ff 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -192,9 +192,9 @@ void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); -void syncNodePingAll(SSyncNode* pSyncNode); -void syncNodePingPeers(SSyncNode* pSyncNode); -void syncNodePingSelf(SSyncNode* pSyncNode); +int32_t syncNodePingAll(SSyncNode* pSyncNode); +int32_t syncNodePingPeers(SSyncNode* pSyncNode); +int32_t syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 72ce986a7ea258684fbff8a9b5283c99a92cac9d..467cfdde5cfe6384ec910cefc3217cf88c2f5b09 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,6 +26,31 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +// TLA+ Spec +// AppendEntries(i, j) == +// /\ i /= j +// /\ state[i] = Leader +// /\ LET prevLogIndex == nextIndex[i][j] - 1 +// prevLogTerm == IF prevLogIndex > 0 THEN +// log[i][prevLogIndex].term +// ELSE +// 0 +// \* Send up to 1 entry, constrained by the end of the log. +// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) +// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) +// IN Send([mtype |-> AppendEntriesRequest, +// mterm |-> currentTerm[i], +// mprevLogIndex |-> prevLogIndex, +// mprevLogTerm |-> prevLogTerm, +// mentries |-> entries, +// \* mlog is used as a history variable for the proof. +// \* It would not exist in a real implementation. +// mlog |-> log[i], +// mcommitIndex |-> Min({commitIndex[i], lastEntry}), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +// int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index da821c3ebd07833834d5ded9372c67e3ac00b30d..8bb4976de25b252c455fea5ddc95d8f2ae73f1b8 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,6 +28,28 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleRequestVoteRequest(i, j, m) == +// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) +// \/ /\ m.mlastLogTerm = LastTerm(log[i]) +// /\ m.mlastLogIndex >= Len(log[i]) +// grant == /\ m.mterm = currentTerm[i] +// /\ logOk +// /\ votedFor[i] \in {Nil, j} +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] +// \/ ~grant /\ UNCHANGED votedFor +// /\ Reply([mtype |-> RequestVoteResponse, +// mterm |-> currentTerm[i], +// mvoteGranted |-> grant, +// \* mlog is used just for the `elections' history variable for +// \* the proof. It would not exist in a real implementation. +// mlog |-> log[i], +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 82f132f80bc8083e2e7c339ef2b0145b6261a66f..ab9430b857f1cc0a40ffa92651bb1bc955fe624e 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,6 +28,23 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// HandleRequestVoteResponse(i, j, m) == +// \* This tallies votes even when the current state is not Candidate, but +// \* they won't be looked at, so it doesn't matter. +// /\ m.mterm = currentTerm[i] +// /\ votesResponded' = [votesResponded EXCEPT ![i] = +// votesResponded[i] \cup {j}] +// /\ \/ /\ m.mvoteGranted +// /\ votesGranted' = [votesGranted EXCEPT ![i] = +// votesGranted[i] \cup {j}] +// /\ voterLog' = [voterLog EXCEPT ![i] = +// voterLog[i] @@ (j :> m.mlog)] +// \/ /\ ~m.mvoteGranted +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 23df8a539cbf229adacec73b1bc71379e2c5a4f0..0a5120c8dc4b87f36adb63eb82c1ba6362744bf2 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -27,4 +27,5 @@ // /\ UNCHANGED <> // /\ Discard(m) // /\ UNCHANGED <> +// int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {} diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index fe86d220cca8579cd9ca235a149b943b0933fcc4..87017b718d56b62ec6ace7046a33f86da55fa88f 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -16,11 +16,6 @@ #include "syncElection.h" #include "syncMessage.h" -int32_t syncNodeElect(SSyncNode* pSyncNode) { - // start election - syncNodeRequestVotePeers(pSyncNode); -} - // TLA+ Spec // RequestVote(i, j) == // /\ state[i] = Candidate @@ -32,8 +27,14 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> +// int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} +int32_t syncNodeElect(SSyncNode* pSyncNode) { + // start election + syncNodeRequestVotePeers(pSyncNode); +} + int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode); int32_t ret = 0; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index cac2b6953de927fae07e5cc5450d508461819ed3..ef30254557c151331995980916feabf9a90e3d46 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -177,7 +177,7 @@ int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* return ret; } -void syncNodePingAll(SSyncNode* pSyncNode) { +int32_t syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); int32_t ret = 0; for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { @@ -190,7 +190,7 @@ void syncNodePingAll(SSyncNode* pSyncNode) { } } -void syncNodePingPeers(SSyncNode* pSyncNode) { +int32_t syncNodePingPeers(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId destId; @@ -202,7 +202,7 @@ void syncNodePingPeers(SSyncNode* pSyncNode) { } } -void syncNodePingSelf(SSyncNode* pSyncNode) { +int32_t syncNodePingSelf(SSyncNode* pSyncNode) { int32_t ret; SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId); ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 878a87067758909f8a5955578bd9be13b9c69a9c..37e8959ff33a3c2f58db762ab9fb9053154c4e87 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -40,6 +40,7 @@ // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> +// int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 533043c5126a1bed2cd1a6af56188abc72d77c82..354c559a905470a9012b42897c75e88f73b46189 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -36,4 +36,5 @@ // mdest |-> j], // m) // /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a5b434dbc52f1a442552d0b42177546b4a1a6559..72223ea83c7abf96be9c6e24c237a340c58c0acf 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -31,4 +31,5 @@ // /\ UNCHANGED <> // /\ Discard(m) // /\ UNCHANGED <> +// int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {}