提交 64d224a0 编写于 作者: M Minghao Li

syncInt

上级 8d15d79f
...@@ -34,9 +34,7 @@ typedef enum { ...@@ -34,9 +34,7 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 0, TAOS_SYNC_STATE_FOLLOWER = 0,
TAOS_SYNC_STATE_CANDIDATE = 1, TAOS_SYNC_STATE_CANDIDATE = 1,
TAOS_SYNC_STATE_LEADER = 2, TAOS_SYNC_STATE_LEADER = 2,
} ESyncRole; } ESyncState;
typedef ESyncRole ESyncState;
typedef struct SSyncBuffer { typedef struct SSyncBuffer {
void* data; void* data;
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "sync.h"
#include "taosdef.h" #include "taosdef.h"
#include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -91,31 +92,61 @@ typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; ...@@ -91,31 +92,61 @@ typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
struct SSyncEnv; struct SSyncEnv;
typedef struct SSyncEnv SSyncEnv; typedef struct SSyncEnv SSyncEnv;
struct SRaftStore;
typedef struct SRaftStore SRaftStore;
struct SVotesGranted;
typedef struct SVotesGranted SVotesGranted;
struct SVotesResponded;
typedef struct SVotesResponded SVotesResponded;
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId; SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId; SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId; } SRaftId;
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm; void* rpcClient;
// passed from outside
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t refCount; // init internal
int64_t rid;
SNodeInfo me; SNodeInfo me;
SNodeInfo peers[TSDB_MAX_REPLICA];
int32_t peersNum; int32_t peersNum;
SNodeInfo peers[TSDB_MAX_REPLICA];
ESyncRole role; // raft algorithm
SSyncFSM* pFsm;
SRaftId raftId; SRaftId raftId;
SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum;
int32_t quorum;
// life cycle
int32_t refCount;
int64_t rid;
// tla+ server vars
ESyncState state;
SRaftStore* pRaftStore;
// tla+ candidate vars
SVotesGranted* pVotesGranted;
SVotesResponded* pVotesResponded;
// tla+ leader vars
SHashObj* pNextIndex;
SHashObj* pMatchIndex;
// tla+ log vars
SSyncLogStore* pLogStore;
SyncIndex commitIndex;
// timer
tmr_h pPingTimer; tmr_h pPingTimer;
int32_t pingTimerMS; int32_t pingTimerMS;
uint8_t pingTimerStart; uint8_t pingTimerStart;
...@@ -136,32 +167,21 @@ typedef struct SSyncNode { ...@@ -136,32 +167,21 @@ typedef struct SSyncNode {
// callback // callback
int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
} SSyncNode; } SSyncNode;
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,6 +26,12 @@ extern "C" { ...@@ -26,6 +26,12 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
typedef struct SVotesGranted {
} SVotesGranted;
typedef struct SVotesResponded {
} SVotesResponded;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -88,7 +88,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -88,7 +88,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
} }
} }
pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册