From 4afcb4387c99c42550d7120fa6359e9d88e4580c Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 27 Feb 2022 10:22:15 +0800 Subject: [PATCH] add sync io --- include/libs/sync/sync.h | 3 +++ source/libs/sync/inc/syncIO.h | 14 ++++++++++---- source/libs/sync/inc/syncInt.h | 4 ++-- source/libs/sync/inc/syncRaftStore.h | 1 - source/libs/sync/src/syncIO.c | 10 +++++++--- source/libs/sync/src/syncMain.c | 4 ++-- source/libs/sync/test/syncPingTest.cpp | 8 ++++++++ 7 files changed, 32 insertions(+), 12 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 41e1491aec..a619e66622 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -21,7 +21,9 @@ extern "C" { #endif #include +#include #include "taosdef.h" +#include "trpc.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -133,6 +135,7 @@ typedef struct SSyncInfo { SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; SSyncFSM* pFsm; + int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncInfo; diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 54a3d2b8c1..4b788efd79 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -45,20 +45,26 @@ typedef struct SSyncIO { int32_t (*start)(struct SSyncIO *ths); int32_t (*stop)(struct SSyncIO *ths); int32_t (*ping)(struct SSyncIO *ths); - int32_t (*onMessage)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); + + int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t (*destroy)(struct SSyncIO *ths); + void *pSyncNode; + int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg); + } SSyncIO; -int32_t syncIOStart(); -int32_t syncIOStop(); +extern SSyncIO *gSyncIO; +int32_t syncIOStart(); +int32_t syncIOStop(); +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); SSyncIO *syncIOCreate(); static int32_t doSyncIOStart(SSyncIO *io); static int32_t doSyncIOStop(SSyncIO *io); static int32_t doSyncIOPing(SSyncIO *io); -static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t doSyncIODestroy(SSyncIO *io); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 9bd8606ee6..ad8484662a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -115,6 +115,8 @@ typedef struct SSyncNode { int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + } SSyncNode; SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); @@ -139,8 +141,6 @@ static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); - - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 610f0c2487..bdaeb81aee 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -54,7 +54,6 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); void raftStorePrint(SRaftStore *pRaftStore); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 0e32d9ac50..bb20d11e37 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -20,6 +20,10 @@ #include "ttimer.h" #include "tutil.h" +SSyncIO *gSyncIO = NULL; + +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } + int32_t syncIOStart() { return 0; } int32_t syncIOStop() { return 0; } @@ -121,7 +125,7 @@ SSyncIO *syncIOCreate() { io->start = doSyncIOStart; io->stop = doSyncIOStop; io->ping = doSyncIOPing; - io->onMessage = doSyncIOOnMessage; + io->onMsg = doSyncIOOnMsg; io->destroy = doSyncIODestroy; return io; @@ -215,7 +219,7 @@ static int32_t doSyncIOPing(SSyncIO *io) { return 0; } -static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } static int32_t doSyncIODestroy(SSyncIO *io) { int8_t start = atomic_load_8(&io->isStart); @@ -242,4 +246,4 @@ static int32_t doSyncIODestroy(SSyncIO *io) { } return 0; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1e13c6125e..bd2952505e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -38,11 +38,12 @@ ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} - SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); + pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->FpPing = doSyncNodePing; pSyncNode->FpOnPing = onSyncNodePing; pSyncNode->FpOnPingReply = onSyncNodePingReply; @@ -56,7 +57,6 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { return pSyncNode; } - void syncNodeClose(SSyncNode* pSyncNode) { assert(pSyncNode != NULL); raftClose(pSyncNode->pRaft); diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 436a25cb5e..b69b102b54 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -14,8 +14,13 @@ void logTest() { } void doSync() { + SSyncFSM* pFsm; + SSyncInfo syncInfo; syncInfo.vgId = 1; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = 0; @@ -32,6 +37,9 @@ void doSync() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); + + gSyncIO->FpOnPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; } int main() { -- GitLab