From 4845ca7f9921e2a2638b4eba639132d7a6fe06fd Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 28 Oct 2021 09:56:20 +0800 Subject: [PATCH] [raft]refactor raft interface,add log store methods --- include/libs/sync/sync.h | 50 ++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a0602ec1b0..30583686c5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -22,7 +22,6 @@ extern "C" { #include #include "taosdef.h" -#include "wal.h" typedef int64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -41,6 +40,7 @@ typedef struct { } SSyncBuffer; typedef struct { + SyncNodeId nodeId; uint16_t nodePort; // node sync Port char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; @@ -83,11 +83,38 @@ typedef struct SSyncFSM { } SSyncFSM; +typedef struct SSyncLogStore { + void* pData; + + // write log with given index + int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); + + // mark log with given index has been commtted + int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); + + // prune log before given index + int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index); + + // rollback log after given index + int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index); +} SSyncLogStore; + typedef struct SSyncServerState { - SNodeInfo voteFor; + SyncNodeId voteFor; SSyncTerm term; } SSyncServerState; +typedef struct SSyncClusterConfig { + // Log index number of current cluster config. + SyncIndex index; + + // Log index number of previous cluster config. + SyncIndex prevIndex; + + // current cluster + const SSyncCluster* cluster; +} SSyncClusterConfig; + typedef struct SStateManager { void* pData; @@ -95,35 +122,38 @@ typedef struct SStateManager { const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); - void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster); + void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); - const SSyncCluster* (*readCluster)(struct SStateManager* stateMng); + const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); } SStateManager; typedef struct { SyncGroupId vgId; - twalh walHandle; - SyncIndex snapshotIndex; SSyncCluster syncCfg; SSyncFSM fsm; + SSyncLogStore logStore; + SStateManager stateManager; } SSyncInfo; +struct SSyncNode; +typedef struct SSyncNode SSyncNode; + int32_t syncInit(); void syncCleanUp(); -SyncNodeId syncStart(const SSyncInfo*); +SSyncNode syncStart(const SSyncInfo*); void syncStop(SyncNodeId); -int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak); +int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak); -int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode); +int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); -int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode); +int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); extern int32_t syncDebugFlag; -- GitLab