提交 fa8284af 编写于 作者: M Minghao Li

sync refactor

上级 f76ca26c
...@@ -46,12 +46,9 @@ typedef struct SSyncEnv { ...@@ -46,12 +46,9 @@ typedef struct SSyncEnv {
extern SSyncEnv* gSyncEnv; extern SSyncEnv* gSyncEnv;
int32_t syncEnvStart(); int32_t syncEnvStart();
int32_t syncEnvStop(); int32_t syncEnvStop();
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param);
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); void syncEnvStopTimer(tmr_h* pTimer);
void syncEnvStopTimer(tmr_h* pTimer);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,28 +32,18 @@ extern "C" { ...@@ -32,28 +32,18 @@ extern "C" {
#define RAFT_STORE_PATH_LEN 128 #define RAFT_STORE_PATH_LEN 128
typedef struct SRaftStore { typedef struct SRaftStore {
SyncTerm currentTerm; SyncTerm currentTerm;
SRaftId voteFor; SRaftId voteFor;
// FileFd fd;
TdFilePtr pFile; TdFilePtr pFile;
char path[RAFT_STORE_PATH_LEN]; char path[RAFT_STORE_PATH_LEN];
} SRaftStore; } SRaftStore;
SRaftStore *raftStoreOpen(const char *path); SRaftStore *raftStoreOpen(const char *path);
int32_t raftStoreClose(SRaftStore *pRaftStore);
static int32_t raftStoreInit(SRaftStore *pRaftStore); int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreClose(SRaftStore *pRaftStore); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
static bool raftStoreFileExist(char *path);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -15,70 +15,69 @@ ...@@ -15,70 +15,69 @@
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // TLA+ Spec
// TLA+ Spec // HandleAppendEntriesRequest(i, j, m) ==
// HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0
// LET logOk == \/ m.mprevLogIndex = 0 // \/ /\ m.mprevLogIndex > 0
// \/ /\ m.mprevLogIndex > 0 // /\ m.mprevLogIndex <= Len(log[i])
// /\ m.mprevLogIndex <= Len(log[i]) // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term // IN /\ m.mterm <= currentTerm[i]
// IN /\ m.mterm <= currentTerm[i] // /\ \/ /\ \* reject request
// /\ \/ /\ \* reject request // \/ m.mterm < currentTerm[i]
// \/ m.mterm < currentTerm[i] // \/ /\ m.mterm = currentTerm[i]
// \/ /\ m.mterm = currentTerm[i] // /\ state[i] = Follower
// /\ state[i] = Follower // /\ \lnot logOk
// /\ \lnot logOk // /\ Reply([mtype |-> AppendEntriesResponse,
// /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // msuccess |-> FALSE,
// msuccess |-> FALSE, // mmatchIndex |-> 0,
// mmatchIndex |-> 0, // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<serverVars, logVars>>
// /\ UNCHANGED <<serverVars, logVars>> // \/ \* return to follower state
// \/ \* return to follower state // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ state[i] = Candidate
// /\ state[i] = Candidate // /\ state' = [state EXCEPT ![i] = Follower]
// /\ state' = [state EXCEPT ![i] = Follower] // /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
// /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>> // \/ \* accept request
// \/ \* accept request // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ state[i] = Follower
// /\ state[i] = Follower // /\ logOk
// /\ logOk // /\ LET index == m.mprevLogIndex + 1
// /\ LET index == m.mprevLogIndex + 1 // IN \/ \* already done with request
// IN \/ \* already done with request // /\ \/ m.mentries = << >>
// /\ \/ m.mentries = << >> // \/ /\ m.mentries /= << >>
// \/ /\ m.mentries /= << >> // /\ Len(log[i]) >= index
// /\ Len(log[i]) >= index // /\ log[i][index].term = m.mentries[1].term
// /\ log[i][index].term = m.mentries[1].term // \* This could make our commitIndex decrease (for
// \* This could make our commitIndex decrease (for // \* example if we process an old, duplicated request),
// \* example if we process an old, duplicated request), // \* but that doesn't really affect anything.
// \* but that doesn't really affect anything. // /\ commitIndex' = [commitIndex EXCEPT ![i] =
// /\ commitIndex' = [commitIndex EXCEPT ![i] = // m.mcommitIndex]
// m.mcommitIndex] // /\ Reply([mtype |-> AppendEntriesResponse,
// /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // msuccess |-> TRUE,
// msuccess |-> TRUE, // mmatchIndex |-> m.mprevLogIndex +
// mmatchIndex |-> m.mprevLogIndex + // Len(m.mentries),
// Len(m.mentries), // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<serverVars, log>>
// /\ UNCHANGED <<serverVars, log>> // \/ \* conflict: remove 1 entry
// \/ \* conflict: remove 1 entry // /\ m.mentries /= << >>
// /\ m.mentries /= << >> // /\ Len(log[i]) >= index
// /\ Len(log[i]) >= index // /\ log[i][index].term /= m.mentries[1].term
// /\ log[i][index].term /= m.mentries[1].term // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> // log[i][index2]]
// log[i][index2]] // IN log' = [log EXCEPT ![i] = new]
// IN log' = [log EXCEPT ![i] = new] // /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<serverVars, commitIndex, messages>> // \/ \* no conflict: append entry
// \/ \* no conflict: append entry // /\ m.mentries /= << >>
// /\ m.mentries /= << >> // /\ Len(log[i]) = m.mprevLogIndex
// /\ Len(log[i]) = m.mprevLogIndex // /\ log' = [log EXCEPT ![i] =
// /\ log' = [log EXCEPT ![i] = // Append(log[i], m.mentries[1])]
// Append(log[i], m.mentries[1])] // /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<serverVars, commitIndex, messages>> // /\ UNCHANGED <<candidateVars, leaderVars>>
// /\ UNCHANGED <<candidateVars, leaderVars>> //
// int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {}
}
...@@ -15,17 +15,16 @@ ...@@ -15,17 +15,16 @@
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { // TLA+ Spec
// TLA+ Spec // HandleAppendEntriesResponse(i, j, m) ==
// HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ \/ /\ m.msuccess \* successful
// /\ \/ /\ m.msuccess \* successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // \/ /\ \lnot m.msuccess \* not successful
// \/ /\ \lnot m.msuccess \* not successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = // Max({nextIndex[i][j] - 1, 1})]
// Max({nextIndex[i][j] - 1, 1})] // /\ UNCHANGED <<matchIndex>>
// /\ UNCHANGED <<matchIndex>> // /\ Discard(m)
// /\ Discard(m) // /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>> int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {}
}
...@@ -16,8 +16,11 @@ ...@@ -16,8 +16,11 @@
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "cJSON.h" #include "cJSON.h"
// to complie success: FileIO interface is modified // private function
static int32_t raftStoreInit(SRaftStore *pRaftStore);
static bool raftStoreFileExist(char *path);
// public function
SRaftStore *raftStoreOpen(const char *path) { SRaftStore *raftStoreOpen(const char *path) {
int32_t ret; int32_t ret;
...@@ -137,121 +140,3 @@ void raftStorePrint(SRaftStore *pRaftStore) { ...@@ -137,121 +140,3 @@ void raftStorePrint(SRaftStore *pRaftStore) {
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf); printf("%s\n", storeBuf);
} }
#if 0
SRaftStore *raftStoreOpen(const char *path) {
int32_t ret;
SRaftStore *pRaftStore = malloc(sizeof(SRaftStore));
if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error");
return NULL;
}
memset(pRaftStore, 0, sizeof(*pRaftStore));
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
char storeBuf[RAFT_STORE_BLOCK_SIZE];
memset(storeBuf, 0, sizeof(storeBuf));
if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore);
assert(ret == 0);
}
pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path);
if (pRaftStore->fd < 0) {
return NULL;
}
int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf));
assert(len == RAFT_STORE_BLOCK_SIZE);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
assert(ret == 0);
return pRaftStore;
}
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path);
if (pRaftStore->fd < 0) {
return -1;
}
pRaftStore->currentTerm = 0;
pRaftStore->voteFor.addr = 0;
pRaftStore->voteFor.vgId = 0;
int32_t ret = raftStorePersist(pRaftStore);
assert(ret == 0);
taosCloseFile(pRaftStore->fd);
return 0;
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
taosCloseFile(pRaftStore->fd);
free(pRaftStore);
return 0;
}
int32_t raftStorePersist(SRaftStore *pRaftStore) {
int32_t ret;
char storeBuf[RAFT_STORE_BLOCK_SIZE];
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
assert(ret == 0);
taosLSeekFile(pRaftStore->fd, 0, SEEK_SET);
ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf));
assert(ret == RAFT_STORE_BLOCK_SIZE);
fsync(pRaftStore->fd);
return 0;
}
static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized);
assert(len2 < len);
memset(buf, 0, len);
snprintf(buf, len, "%s", serialized);
free(serialized);
cJSON_Delete(pRoot);
return 0;
}
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
pRaftStore->currentTerm = pCurrentTerm->valueint;
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
pRaftStore->voteFor.addr = pVoteForAddr->valueint;
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
cJSON_Delete(pRoot);
return 0;
}
void raftStorePrint(SRaftStore *pRaftStore) {
char storeBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf);
}
#endif
...@@ -15,26 +15,25 @@ ...@@ -15,26 +15,25 @@
#include "syncRequestVote.h" #include "syncRequestVote.h"
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // TLA+ Spec
// TLA+ Spec // HandleRequestVoteRequest(i, j, m) ==
// HandleRequestVoteRequest(i, j, m) == // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) // \/ /\ m.mlastLogTerm = LastTerm(log[i])
// \/ /\ m.mlastLogTerm = LastTerm(log[i]) // /\ m.mlastLogIndex >= Len(log[i])
// /\ m.mlastLogIndex >= Len(log[i]) // grant == /\ m.mterm = currentTerm[i]
// grant == /\ m.mterm = currentTerm[i] // /\ logOk
// /\ logOk // /\ votedFor[i] \in {Nil, j}
// /\ votedFor[i] \in {Nil, j} // IN /\ m.mterm <= currentTerm[i]
// IN /\ m.mterm <= currentTerm[i] // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j]
// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] // \/ ~grant /\ UNCHANGED votedFor
// \/ ~grant /\ UNCHANGED votedFor // /\ Reply([mtype |-> RequestVoteResponse,
// /\ Reply([mtype |-> RequestVoteResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mvoteGranted |-> grant,
// mvoteGranted |-> grant, // \* mlog is used just for the `elections' history variable for
// \* mlog is used just for the `elections' history variable for // \* the proof. It would not exist in a real implementation.
// \* the proof. It would not exist in a real implementation. // mlog |-> log[i],
// mlog |-> log[i], // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>> int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {}
}
...@@ -15,21 +15,20 @@ ...@@ -15,21 +15,20 @@
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { // TLA+ Spec
// TLA+ Spec // HandleRequestVoteResponse(i, j, m) ==
// HandleRequestVoteResponse(i, j, m) == // \* This tallies votes even when the current state is not Candidate, but
// \* This tallies votes even when the current state is not Candidate, but // \* they won't be looked at, so it doesn't matter.
// \* they won't be looked at, so it doesn't matter. // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ votesResponded' = [votesResponded EXCEPT ![i] =
// /\ votesResponded' = [votesResponded EXCEPT ![i] = // votesResponded[i] \cup {j}]
// votesResponded[i] \cup {j}] // /\ \/ /\ m.mvoteGranted
// /\ \/ /\ m.mvoteGranted // /\ votesGranted' = [votesGranted EXCEPT ![i] =
// /\ votesGranted' = [votesGranted EXCEPT ![i] = // votesGranted[i] \cup {j}]
// votesGranted[i] \cup {j}] // /\ voterLog' = [voterLog EXCEPT ![i] =
// /\ voterLog' = [voterLog EXCEPT ![i] = // voterLog[i] @@ (j :> m.mlog)]
// voterLog[i] @@ (j :> m.mlog)] // \/ /\ ~m.mvoteGranted
// \/ /\ ~m.mvoteGranted // /\ UNCHANGED <<votesGranted, voterLog>>
// /\ UNCHANGED <<votesGranted, voterLog>> // /\ Discard(m)
// /\ Discard(m) // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>> int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册