提交 324c5742 编写于 作者: M Minghao Li

TD-13476 add sync.h

上级 3dc9a178
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
typedef int32_t SyncNodeId;
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SyncTerm;
......@@ -46,109 +46,113 @@ typedef struct {
} SNodeInfo;
typedef struct {
int32_t selfIndex;
int32_t replica;
int32_t replicaNum;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCluster;
} SSyncCfg;
typedef struct {
int32_t selfIndex;
int32_t replica;
int32_t replicaNum;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
ESyncState role[TSDB_MAX_REPLICA];
} SNodesRole;
typedef struct SSyncFSM {
void* pData;
// apply committed log, bufs will be free by sync module
int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
// abstract definition of snapshot
typedef struct SSnapshot {
void* data;
SyncIndex lastApplyIndex;
} SSnapshot;
// cluster commit callback
int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
typedef struct SSyncFSM {
void* data;
// fsm return snapshot in ppBuf, bufs will be free by sync module
// TODO: getSnapshot SHOULD be async?
int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast);
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
// fsm apply snapshot with pBuf data
int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast);
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
// call when restore snapshot and log done
int32_t (*onRestoreDone)(struct SSyncFSM* fsm);
// when log entry is updated by a new one, FpRollBackCb is called
// user can do something to roll back. for example, delete data from tsdb, or just ignore it
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
// user should implement this function, use "data" to take snapshot into "snapshot"
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
void (*onRoleChanged)(struct SSyncFSM* fsm, const SNodesRole* pRole);
// user should implement this function, restore "data" from "snapshot"
int32_t (*FpRestoreSnapshot)(const SSnapshot* snapshot);
} SSyncFSM;
// abstract definition of log store in raft
// SWal implements it
typedef struct SSyncLogStore {
void* pData;
void* data;
// append one log entry
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);
// write log with given index
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf);
// get one log entry, user need to free pBuf->data
int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf);
* read log from given index(included) with limit, return the actual num in nBuf,
* pBuf will be free in sync module
int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit,
SSyncBuffer* pBuf, int* nBuf);
// update log store commit index with "index"
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
// mark log with given index has been commtted
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index);
// truncate log with index, entries after the given index (>index) will be deleted
int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index);
// prune log before given index(not included)
int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index);
// return commit index of log
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
// rollback log after given index(included)
int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index);
// return index of last entry
SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
// return last index of log
SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore);
} SSyncLogStore;
typedef struct SStateManager {
void* pData;
// raft need to persist two variables in storage: currentTerm, voteFor
typedef struct SStateMgr {
void* data;
// save serialized server state data, buffer will be free by Sync
int32_t (*saveServerState)(struct SStateManager* stateMng, const char* buffer, int n);
int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm);
int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm);
// read serialized server state data, buffer will be free by Sync
int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n);
int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor);
int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor);
// save serialized cluster state data, buffer will be free by Sync
void (*saveClusterState)(struct SStateManager* stateMng, const char* buffer, int n);
int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
// read serialized cluster state data, buffer will be free by Sync
int32_t (*readClusterState)(struct SStateManager* stateMng, char** ppBuffer, int* n);
} SStateManager;
} SStateMgr;
typedef struct {
SyncGroupId vgId;
SyncIndex appliedIndex;
SSyncCluster syncCfg;
SSyncFSM fsm;
SSyncCfg syncCfg;
SSyncLogStore logStore;
SStateManager stateManager;
SStateMgr stateManager;
SSyncFSM syncFsm;
} SSyncInfo;
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
// will be defined in syncInt.h, here just for complie
typedef struct SSyncNode {
} SSyncNode;
int32_t syncInit();
void syncCleanUp();
SSyncNode* syncStart(const SSyncInfo*);
void syncReconfig(const SSyncNode*, const SSyncCluster*);
void syncStop(const SSyncNode*);
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak);
int64_t syncStart(const SSyncInfo*);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg*);
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole*);
extern int32_t sDebugFlag;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册