提交 4afcb438 编写于 作者: M Minghao Li

add sync io

上级 b55cf2bc
...@@ -21,7 +21,9 @@ extern "C" { ...@@ -21,7 +21,9 @@ extern "C" {
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <tep.h>
#include "taosdef.h" #include "taosdef.h"
#include "trpc.h"
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
...@@ -133,6 +135,7 @@ typedef struct SSyncInfo { ...@@ -133,6 +135,7 @@ typedef struct SSyncInfo {
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm; SSyncFSM* pFsm;
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
......
...@@ -45,20 +45,26 @@ typedef struct SSyncIO { ...@@ -45,20 +45,26 @@ typedef struct SSyncIO {
int32_t (*start)(struct SSyncIO *ths); int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths); int32_t (*stop)(struct SSyncIO *ths);
int32_t (*ping)(struct SSyncIO *ths); int32_t (*ping)(struct SSyncIO *ths);
int32_t (*onMessage)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t (*destroy)(struct SSyncIO *ths); int32_t (*destroy)(struct SSyncIO *ths);
void *pSyncNode;
int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg);
} SSyncIO; } SSyncIO;
extern SSyncIO *gSyncIO;
int32_t syncIOStart(); int32_t syncIOStart();
int32_t syncIOStop(); int32_t syncIOStop();
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg);
SSyncIO *syncIOCreate(); SSyncIO *syncIOCreate();
static int32_t doSyncIOStart(SSyncIO *io); static int32_t doSyncIOStart(SSyncIO *io);
static int32_t doSyncIOStop(SSyncIO *io); static int32_t doSyncIOStop(SSyncIO *io);
static int32_t doSyncIOPing(SSyncIO *io); static int32_t doSyncIOPing(SSyncIO *io);
static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t doSyncIODestroy(SSyncIO *io); static int32_t doSyncIODestroy(SSyncIO *io);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -115,6 +115,8 @@ typedef struct SSyncNode { ...@@ -115,6 +115,8 @@ typedef struct SSyncNode {
int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncNode; } SSyncNode;
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
...@@ -139,8 +141,6 @@ static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* ...@@ -139,8 +141,6 @@ static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries*
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -54,7 +54,6 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); ...@@ -54,7 +54,6 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore); void raftStorePrint(SRaftStore *pRaftStore);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
SSyncIO *gSyncIO = NULL;
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; }
int32_t syncIOStart() { return 0; } int32_t syncIOStart() { return 0; }
int32_t syncIOStop() { return 0; } int32_t syncIOStop() { return 0; }
...@@ -121,7 +125,7 @@ SSyncIO *syncIOCreate() { ...@@ -121,7 +125,7 @@ SSyncIO *syncIOCreate() {
io->start = doSyncIOStart; io->start = doSyncIOStart;
io->stop = doSyncIOStop; io->stop = doSyncIOStop;
io->ping = doSyncIOPing; io->ping = doSyncIOPing;
io->onMessage = doSyncIOOnMessage; io->onMsg = doSyncIOOnMsg;
io->destroy = doSyncIODestroy; io->destroy = doSyncIODestroy;
return io; return io;
...@@ -215,7 +219,7 @@ static int32_t doSyncIOPing(SSyncIO *io) { ...@@ -215,7 +219,7 @@ static int32_t doSyncIOPing(SSyncIO *io) {
return 0; return 0;
} }
static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }
static int32_t doSyncIODestroy(SSyncIO *io) { static int32_t doSyncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart); int8_t start = atomic_load_8(&io->isStart);
......
...@@ -38,11 +38,12 @@ ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } ...@@ -38,11 +38,12 @@ ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpPing = doSyncNodePing; pSyncNode->FpPing = doSyncNodePing;
pSyncNode->FpOnPing = onSyncNodePing; pSyncNode->FpOnPing = onSyncNodePing;
pSyncNode->FpOnPingReply = onSyncNodePingReply; pSyncNode->FpOnPingReply = onSyncNodePingReply;
...@@ -56,7 +57,6 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -56,7 +57,6 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
return pSyncNode; return pSyncNode;
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
raftClose(pSyncNode->pRaft); raftClose(pSyncNode->pRaft);
......
...@@ -14,8 +14,13 @@ void logTest() { ...@@ -14,8 +14,13 @@ void logTest() {
} }
void doSync() { void doSync() {
SSyncFSM* pFsm;
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = 1; syncInfo.vgId = 1;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
SSyncCfg* pCfg = &syncInfo.syncCfg; SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = 0; pCfg->myIndex = 0;
...@@ -32,6 +37,9 @@ void doSync() { ...@@ -32,6 +37,9 @@ void doSync() {
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
gSyncIO->FpOnPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
} }
int main() { int main() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册