提交 dbe28cd4 编写于 作者: M Minghao Li

add sync code

上级 e8780ceb
...@@ -131,6 +131,9 @@ typedef struct SStateMgr { ...@@ -131,6 +131,9 @@ typedef struct SStateMgr {
typedef struct SSyncInfo { typedef struct SSyncInfo {
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm;
} SSyncInfo; } SSyncInfo;
struct SSyncNode; struct SSyncNode;
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncMessage.h"
#include "taosdef.h" #include "taosdef.h"
#define sFatal(...) \ #define sFatal(...) \
...@@ -62,16 +63,60 @@ extern "C" { ...@@ -62,16 +63,60 @@ extern "C" {
} \ } \
} }
struct SRaft;
typedef struct SSyncNode { typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN]; int8_t replica;
int8_t replica; int8_t quorum;
int8_t quorum;
int8_t selfIndex; SyncGroupId vgId;
uint32_t vgId; SSyncCfg syncCfg;
int32_t refCount; char path[TSDB_FILENAME_LEN];
int64_t rid;
struct SRaft* pRaft;
int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg);
int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg);
int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
} SSyncNode; } SSyncNode;
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode);
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg);
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg);
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg);
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -43,7 +43,6 @@ typedef struct SyncPing { ...@@ -43,7 +43,6 @@ typedef struct SyncPing {
const SSyncBuffer *pData; const SSyncBuffer *pData;
} SyncPing, RaftPing; } SyncPing, RaftPing;
typedef struct SyncPingReply { typedef struct SyncPingReply {
ESyncMessageType msgType; ESyncMessageType msgType;
const SSyncBuffer *pData; const SSyncBuffer *pData;
...@@ -63,7 +62,6 @@ typedef struct SyncClientRequestReply { ...@@ -63,7 +62,6 @@ typedef struct SyncClientRequestReply {
const SSyncBuffer *pLeaderHint; const SSyncBuffer *pLeaderHint;
} SyncClientRequestReply, RaftClientRequestReply; } SyncClientRequestReply, RaftClientRequestReply;
typedef struct SyncRequestVote { typedef struct SyncRequestVote {
ESyncMessageType msgType; ESyncMessageType msgType;
SyncTerm currentTerm; SyncTerm currentTerm;
......
...@@ -33,8 +33,8 @@ typedef struct SRaftId { ...@@ -33,8 +33,8 @@ typedef struct SRaftId {
} SRaftId; } SRaftId;
typedef struct SRaft { typedef struct SRaft {
SRaftId id; SRaftId id;
void* data; SSyncFSM* pFsm;
int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg);
...@@ -56,7 +56,9 @@ typedef struct SRaft { ...@@ -56,7 +56,9 @@ typedef struct SRaft {
} SRaft; } SRaft;
SRaft* raftCreate(SRaftId raftId, void* data); SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm);
void raftClose(SRaft* pRaft);
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg);
......
...@@ -16,12 +16,17 @@ ...@@ -16,12 +16,17 @@
#include <stdint.h> #include <stdint.h>
#include "sync.h" #include "sync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h"
int32_t syncInit() { return 0; } int32_t syncInit() { return 0; }
void syncCleanUp() {} void syncCleanUp() {}
int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } int64_t syncStart(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL);
return 0;
}
void syncStop(int64_t rid) {} void syncStop(int64_t rid) {}
...@@ -31,4 +36,72 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r ...@@ -31,4 +36,72 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
\ No newline at end of file
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
assert(pSyncNode != NULL);
pSyncNode->FpPing = doSyncNodePing;
pSyncNode->FpOnPing = onSyncNodePing;
pSyncNode->FpOnPingReply = onSyncNodePingReply;
pSyncNode->FpRequestVote = doSyncNodeRequestVote;
pSyncNode->FpOnRequestVote = onSyncNodeRequestVote;
pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply;
pSyncNode->FpAppendEntries = doSyncNodeAppendEntries;
pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries;
pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply;
return pSyncNode;
}
void syncNodeClose(SSyncNode* pSyncNode) {
assert(pSyncNode != NULL);
raftClose(pSyncNode->pRaft);
free(pSyncNode);
}
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) {
int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg);
return ret;
}
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg);
return ret;
}
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg);
return ret;
}
\ No newline at end of file
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
#include "syncRaft.h" #include "syncRaft.h"
#include "sync.h" #include "sync.h"
SRaft* raftCreate(SRaftId raftId, void* data) { SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL); assert(pRaft != NULL);
pRaft->id = raftId; pRaft->id = raftId;
pRaft->data = data; pRaft->pFsm = pFsm;
pRaft->FpPing = doRaftPing; pRaft->FpPing = doRaftPing;
pRaft->FpOnPing = onRaftPing; pRaft->FpOnPing = onRaftPing;
...@@ -38,6 +38,11 @@ SRaft* raftCreate(SRaftId raftId, void* data) { ...@@ -38,6 +38,11 @@ SRaft* raftCreate(SRaftId raftId, void* data) {
return pRaft; return pRaft;
} }
void raftClose(SRaft* pRaft) {
assert(pRaft != NULL);
free(pRaft);
}
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; }
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册