未验证 提交 ce538a0a 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10592 from taosdata/feature/3.0_mhli

sync refactor
...@@ -31,11 +31,11 @@ extern "C" { ...@@ -31,11 +31,11 @@ extern "C" {
typedef struct SSyncIO { typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset *pQset; STaosQset * pQset;
pthread_t consumerTid; pthread_t consumerTid;
void *serverRpc; void * serverRpc;
void *clientRpc; void * clientRpc;
SEpSet myAddr; SEpSet myAddr;
void *ioTimerTickQ; void *ioTimerTickQ;
......
...@@ -28,6 +28,18 @@ extern "C" { ...@@ -28,6 +28,18 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec
// Timeout(i) == /\ state[i] \in {Follower, Candidate}
// /\ state' = [state EXCEPT ![i] = Candidate]
// /\ currentTerm' = [currentTerm EXCEPT ![i] = currentTerm[i] + 1]
// \* Most implementations would probably just set the local vote
// \* atomically, but messaging localhost for it is weaker.
// /\ votedFor' = [votedFor EXCEPT ![i] = Nil]
// /\ votesResponded' = [votesResponded EXCEPT ![i] = {}]
// /\ votesGranted' = [votesGranted EXCEPT ![i] = {}]
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>>
//
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -363,6 +363,24 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { ...@@ -363,6 +363,24 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
syncNodeStartElectTimer(pSyncNode, electMS); syncNodeStartElectTimer(pSyncNode, electMS);
} }
// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
// /\ state[i] = Candidate
// /\ votesGranted[i] \in Quorum
// /\ state' = [state EXCEPT ![i] = Leader]
// /\ nextIndex' = [nextIndex EXCEPT ![i] =
// [j \in Server |-> Len(log[i]) + 1]]
// /\ matchIndex' = [matchIndex EXCEPT ![i] =
// [j \in Server |-> 0]]
// /\ elections' = elections \cup
// {[eterm |-> currentTerm[i],
// eleader |-> i,
// elog |-> log[i],
// evotes |-> votesGranted[i],
// evoterLog |-> voterLog[i]]}
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->leaderCache = pSyncNode->raftId; pSyncNode->leaderCache = pSyncNode->raftId;
......
...@@ -20,7 +20,30 @@ int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { ...@@ -20,7 +20,30 @@ int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) {
// get one log entry, user need to free pBuf->data // get one log entry, user need to free pBuf->data
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; } int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; }
// update log store commit index with "index" // TLA+ Spec
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
// \* in part to minimize atomic regions, and in part so that leaders of
// \* single-server clusters are able to mark entries committed.
// AdvanceCommitIndex(i) ==
// /\ state[i] = Leader
// /\ LET \* The set of servers that agree up through index.
// Agree(index) == {i} \cup {k \in Server :
// matchIndex[i][k] >= index}
// \* The maximum indexes for which a quorum agrees
// agreeIndexes == {index \in 1..Len(log[i]) :
// Agree(index) \in Quorum}
// \* New value for commitIndex'[i]
// newCommitIndex ==
// IF /\ agreeIndexes /= {}
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
// THEN
// Max(agreeIndexes)
// ELSE
// commitIndex[i]
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; }
// truncate log with index, entries after the given index (>index) will be deleted // truncate log with index, entries after the given index (>index) will be deleted
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册