diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 283604508ff0131dad79bed8ce34e3d810362ec7..00ba1120e713bf38ec371ecc380db394e92d6b96 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 TAOS Data, Inc. + * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -23,7 +23,7 @@ extern "C" { #include #include "taosdef.h" -typedef int32_t SyncNodeId; +typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; typedef uint64_t SyncTerm; @@ -46,109 +46,113 @@ typedef struct { } SNodeInfo; typedef struct { - int32_t selfIndex; - int32_t replica; + int32_t replicaNum; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; -} SSyncCluster; +} SSyncCfg; typedef struct { - int32_t selfIndex; - int32_t replica; - SNodeInfo node[TSDB_MAX_REPLICA]; + int32_t replicaNum; + SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; ESyncState role[TSDB_MAX_REPLICA]; } SNodesRole; -typedef struct SSyncFSM { - void* pData; - - // apply committed log, bufs will be free by sync module - int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); +// abstract definition of snapshot +typedef struct SSnapshot { + void* data; + SyncIndex lastApplyIndex; +} SSnapshot; - // cluster commit callback - int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); +typedef struct SSyncFSM { + void* data; - // fsm return snapshot in ppBuf, bufs will be free by sync module - // TODO: getSnapshot SHOULD be async? - int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast); + // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result + // user can do something according to the code and isWeak. for example, write data into tsdb + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); - // fsm apply snapshot with pBuf data - int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast); + // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result + // user can do something according to the code and isWeak. for example, write data into tsdb + void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); - // call when restore snapshot and log done - int32_t (*onRestoreDone)(struct SSyncFSM* fsm); + // when log entry is updated by a new one, FpRollBackCb is called + // user can do something to roll back. for example, delete data from tsdb, or just ignore it + void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); - void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf); + // user should implement this function, use "data" to take snapshot into "snapshot" + int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); - void (*onRoleChanged)(struct SSyncFSM* fsm, const SNodesRole* pRole); + // user should implement this function, restore "data" from "snapshot" + int32_t (*FpRestoreSnapshot)(const SSnapshot* snapshot); } SSyncFSM; +// abstract definition of log store in raft +// SWal implements it typedef struct SSyncLogStore { - void* pData; + void* data; + + // append one log entry + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); - // write log with given index - int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); + // get one log entry, user need to free pBuf->data + int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); - /** - * read log from given index(included) with limit, return the actual num in nBuf, - * pBuf will be free in sync module - **/ - int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit, - SSyncBuffer* pBuf, int* nBuf); + // update log store commit index with "index" + int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); - // mark log with given index has been commtted - int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); + // truncate log with index, entries after the given index (>index) will be deleted + int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index); - // prune log before given index(not included) - int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index); + // return commit index of log + SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); - // rollback log after given index(included) - int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index); + // return index of last entry + SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore); + + // return term of last entry + SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore); - // return last index of log - SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore); } SSyncLogStore; -typedef struct SStateManager { - void* pData; +// raft need to persist two variables in storage: currentTerm, voteFor +typedef struct SStateMgr { + void* data; - // save serialized server state data, buffer will be free by Sync - int32_t (*saveServerState)(struct SStateManager* stateMng, const char* buffer, int n); + int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm); + int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm); - // read serialized server state data, buffer will be free by Sync - int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n); + int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor); + int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor); - // save serialized cluster state data, buffer will be free by Sync - void (*saveClusterState)(struct SStateManager* stateMng, const char* buffer, int n); + int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg); + int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg); - // read serialized cluster state data, buffer will be free by Sync - int32_t (*readClusterState)(struct SStateManager* stateMng, char** ppBuffer, int* n); -} SStateManager; +} SStateMgr; typedef struct { SyncGroupId vgId; - SyncIndex appliedIndex; - SSyncCluster syncCfg; - SSyncFSM fsm; + SSyncCfg syncCfg; SSyncLogStore logStore; - SStateManager stateManager; + SStateMgr stateManager; + SSyncFSM syncFsm; + } SSyncInfo; -struct SSyncNode; -typedef struct SSyncNode SSyncNode; +// will be defined in syncInt.h, here just for complie +typedef struct SSyncNode { +} SSyncNode; int32_t syncInit(); void syncCleanUp(); -SSyncNode* syncStart(const SSyncInfo*); -void syncReconfig(const SSyncNode*, const SSyncCluster*); -void syncStop(const SSyncNode*); - -int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak); +int64_t syncStart(const SSyncInfo*); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg*); -int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); +// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); -int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); +ESyncState syncGetMyRole(int64_t rid); +void syncGetNodesRole(int64_t rid, SNodesRole*); extern int32_t sDebugFlag;