From cf0857e113212ee823f4e5f298bcf4ac5e40871e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 16:59:16 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncIO.h | 6 +++--- source/libs/sync/inc/syncTimeout.h | 12 ++++++++++++ source/libs/sync/src/syncMain.c | 18 ++++++++++++++++++ source/libs/sync/src/syncRaftLog.c | 25 ++++++++++++++++++++++++- 4 files changed, 57 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index a948de8ac1..160fefd086 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset *pQset; + STaosQset * pQset; pthread_t consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; void *ioTimerTickQ; diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 3dda1f212c..efd5aae48e 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -28,6 +28,18 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +// TLA+ Spec +// Timeout(i) == /\ state[i] \in {Follower, Candidate} +// /\ state' = [state EXCEPT ![i] = Candidate] +// /\ currentTerm' = [currentTerm EXCEPT ![i] = currentTerm[i] + 1] +// \* Most implementations would probably just set the local vote +// \* atomically, but messaging localhost for it is weaker. +// /\ votedFor' = [votedFor EXCEPT ![i] = Nil] +// /\ votesResponded' = [votesResponded EXCEPT ![i] = {}] +// /\ votesGranted' = [votesGranted EXCEPT ![i] = {}] +// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] +// /\ UNCHANGED <> +// int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ef30254557..db34d16690 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -363,6 +363,24 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { syncNodeStartElectTimer(pSyncNode, electMS); } +// TLA+ Spec +// \* Candidate i transitions to leader. +// BecomeLeader(i) == +// /\ state[i] = Candidate +// /\ votesGranted[i] \in Quorum +// /\ state' = [state EXCEPT ![i] = Leader] +// /\ nextIndex' = [nextIndex EXCEPT ![i] = +// [j \in Server |-> Len(log[i]) + 1]] +// /\ matchIndex' = [matchIndex EXCEPT ![i] = +// [j \in Server |-> 0]] +// /\ elections' = elections \cup +// {[eterm |-> currentTerm[i], +// eleader |-> i, +// elog |-> log[i], +// evotes |-> votesGranted[i], +// evoterLog |-> voterLog[i]]} +// /\ UNCHANGED <> +// static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->leaderCache = pSyncNode->raftId; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 37bb3ce48c..e467057c8f 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -20,7 +20,30 @@ int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { // get one log entry, user need to free pBuf->data int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; } -// update log store commit index with "index" +// TLA+ Spec +// \* Leader i advances its commitIndex. +// \* This is done as a separate step from handling AppendEntries responses, +// \* in part to minimize atomic regions, and in part so that leaders of +// \* single-server clusters are able to mark entries committed. +// AdvanceCommitIndex(i) == +// /\ state[i] = Leader +// /\ LET \* The set of servers that agree up through index. +// Agree(index) == {i} \cup {k \in Server : +// matchIndex[i][k] >= index} +// \* The maximum indexes for which a quorum agrees +// agreeIndexes == {index \in 1..Len(log[i]) : +// Agree(index) \in Quorum} +// \* New value for commitIndex'[i] +// newCommitIndex == +// IF /\ agreeIndexes /= {} +// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] +// THEN +// Max(agreeIndexes) +// ELSE +// commitIndex[i] +// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] +// /\ UNCHANGED <> +// int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } // truncate log with index, entries after the given index (>index) will be deleted -- GitLab