提交 25af19bd 编写于 作者: M Minghao Li

sync refactor

上级 a0319ae4
...@@ -28,6 +28,8 @@ extern "C" { ...@@ -28,6 +28,8 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,6 +28,8 @@ extern "C" { ...@@ -28,6 +28,8 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -26,7 +26,11 @@ extern "C" { ...@@ -26,7 +26,11 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
void syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pSyncNode);
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -188,29 +188,22 @@ typedef struct SSyncNode { ...@@ -188,29 +188,22 @@ typedef struct SSyncNode {
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
void syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode); int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -26,7 +26,9 @@ extern "C" { ...@@ -26,7 +26,9 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,6 +28,8 @@ extern "C" { ...@@ -28,6 +28,8 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,6 +28,8 @@ extern "C" { ...@@ -28,6 +28,8 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -15,33 +15,6 @@ ...@@ -15,33 +15,6 @@
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
// TLA+ Spec
// AppendEntries(i, j) ==
// /\ i /= j
// /\ state[i] = Leader
// /\ LET prevLogIndex == nextIndex[i][j] - 1
// prevLogTerm == IF prevLogIndex > 0 THEN
// log[i][prevLogIndex].term
// ELSE
// 0
// \* Send up to 1 entry, constrained by the end of the log.
// lastEntry == Min({Len(log[i]), nextIndex[i][j]})
// entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
// IN Send([mtype |-> AppendEntriesRequest,
// mterm |-> currentTerm[i],
// mprevLogIndex |-> prevLogIndex,
// mprevLogTerm |-> prevLogTerm,
// mentries |-> entries,
// \* mlog is used as a history variable for the proof.
// \* It would not exist in a real implementation.
// mlog |-> log[i],
// mcommitIndex |-> Min({commitIndex[i], lastEntry}),
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
}
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) == // HandleAppendEntriesRequest(i, j, m) ==
......
...@@ -14,8 +14,32 @@ ...@@ -14,8 +14,32 @@
*/ */
#include "syncElection.h" #include "syncElection.h"
#include "syncMessage.h"
void syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
// start election // start election
syncNodeRequestVotePeers(pSyncNode); syncNodeRequestVotePeers(pSyncNode);
} }
// TLA+ Spec
// RequestVote(i, j) ==
// /\ state[i] = Candidate
// /\ j \notin votesResponded[i]
// /\ Send([mtype |-> RequestVoteRequest,
// mterm |-> currentTerm[i],
// mlastLogTerm |-> LastTerm(log[i]),
// mlastLogIndex |-> Len(log[i]),
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return ret;
}
\ No newline at end of file
...@@ -15,23 +15,23 @@ ...@@ -15,23 +15,23 @@
#include <stdint.h> #include <stdint.h>
#include "sync.h" #include "sync.h"
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
#include "syncEnv.h" #include "syncEnv.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
...@@ -135,6 +135,48 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -135,6 +135,48 @@ void syncNodeClose(SSyncNode* pSyncNode) {
free(pSyncNode); free(pSyncNode);
} }
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
{
SyncPing* pMsg2 = rpcMsg.pCont;
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
return ret;
}
void syncNodePingAll(SSyncNode* pSyncNode) { void syncNodePingAll(SSyncNode* pSyncNode) {
sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
int32_t ret = 0; int32_t ret = 0;
...@@ -168,10 +210,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { ...@@ -168,10 +210,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
syncPingDestroy(pMsg); syncPingDestroy(pMsg);
} }
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
pSyncNode->pingTimerMS = PING_TIMER_MS; pSyncNode->pingTimerMS = PING_TIMER_MS;
...@@ -235,48 +273,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -235,48 +273,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
} }
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
{
SyncPing* pMsg2 = rpcMsg.pCont;
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
return ret;
}
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = 0; int32_t ret = 0;
sTrace("<-- syncNodeOnPingCb -->"); sTrace("<-- syncNodeOnPingCb -->");
......
...@@ -14,5 +14,40 @@ ...@@ -14,5 +14,40 @@
*/ */
#include "syncReplication.h" #include "syncReplication.h"
#include "syncMessage.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} // TLA+ Spec
\ No newline at end of file // AppendEntries(i, j) ==
// /\ i /= j
// /\ state[i] = Leader
// /\ LET prevLogIndex == nextIndex[i][j] - 1
// prevLogTerm == IF prevLogIndex > 0 THEN
// log[i][prevLogIndex].term
// ELSE
// 0
// \* Send up to 1 entry, constrained by the end of the log.
// lastEntry == Min({Len(log[i]), nextIndex[i][j]})
// entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
// IN Send([mtype |-> AppendEntriesRequest,
// mterm |-> currentTerm[i],
// mprevLogIndex |-> prevLogIndex,
// mprevLogTerm |-> prevLogTerm,
// mentries |-> entries,
// \* mlog is used as a history variable for the proof.
// \* It would not exist in a real implementation.
// mlog |-> log[i],
// mcommitIndex |-> Min({commitIndex[i], lastEntry}),
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return ret;
}
\ No newline at end of file
...@@ -15,20 +15,6 @@ ...@@ -15,20 +15,6 @@
#include "syncRequestVote.h" #include "syncRequestVote.h"
int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
// TLA+ Spec
// RequestVote(i, j) ==
// /\ state[i] = Candidate
// /\ j \notin votesResponded[i]
// /\ Send([mtype |-> RequestVoteRequest,
// mterm |-> currentTerm[i],
// mlastLogTerm |-> LastTerm(log[i]),
// mlastLogIndex |-> Len(log[i]),
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
}
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteRequest(i, j, m) == // HandleRequestVoteRequest(i, j, m) ==
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncElection.h" #include "syncElection.h"
#include "syncReplication.h"
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0; int32_t ret = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册