diff --git a/include/libs/raft/raft.h b/include/libs/raft/raft.h index 1eedc0f4b85d6c68f308435b8d223e60057d4df5..5b7f93276b01f314a57513a75613658e4d78f055 100644 --- a/include/libs/raft/raft.h +++ b/include/libs/raft/raft.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 TAOS Data, Inc. + * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -13,15 +13,149 @@ * along with this program. If not, see . */ -#ifndef _TD_RAFT_H_ -#define _TD_RAFT_H_ +#ifndef TD_RAFT_H +#define TD_RAFT_H #ifdef __cplusplus extern "C" { #endif +#include +#include "taosdef.h" + +typedef unsigned int RaftId; +typedef unsigned int RaftGroupId; + +// buffer holding data +typedef struct RaftBuffer { + void* data; + size_t len; +} RaftBuffer; + +// a single server information in a cluster +typedef struct RaftServer { + RaftId id; + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; +} RaftServer; + +// all servers in a cluster +typedef struct RaftConfiguration { + RaftServer *servers; + int nServer; +} RaftConfiguration; + +// raft lib module +struct Raft; +typedef struct Raft Raft; + +struct RaftNode; +typedef struct RaftNode RaftNode; + +// raft state machine +struct RaftFSM; +typedef struct RaftFSM { + // statemachine user data + void *data; + + // apply buffer data, bufs will be free by raft module + int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs); + + // configuration commit callback + int (*onConfigurationCommit)(const RaftConfiguration* cluster); + + // fsm return snapshot in ppBuf, bufs will be free by raft module + // TODO: getSnapshot SHOULD be async? + int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf); + + // fsm restore with pBuf data + int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf); + + // fsm send data in buf to server,buf will be free by raft module + int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf); +} RaftFSM; + +typedef struct RaftNodeOptions { + // user define state machine + RaftFSM* pFSM; + + // election timeout(in ms) + // by default: 1000 + int electionTimeoutMS; + + // heart timeout(in ms) + // by default: 100 + int heartbeatTimeoutMS; + + // install snapshot timeout(in ms) + int installSnapshotTimeoutMS; + + /** + * number of log entries before starting a new snapshot. + * by default: 1024 + */ + int snapshotThreshold; + + /** + * Number of log entries to keep in the log after a snapshot has + * been taken. + * by default: 128. + */ + int snapshotTrailing; + + /** + * Enable or disable pre-vote support. + * by default: false + */ + bool preVote; + +} RaftNodeOptions; + +// create raft lib +int RaftCreate(Raft** ppRaft); + +int RaftDestroy(Raft* pRaft); + +// start a raft node with options,node id,group id +int RaftStart(Raft* pRaft, + RaftId selfId, + RaftGroupId selfGroupId, + const RaftConfiguration* cluster, + const RaftNodeOptions* options, + RaftNode **ppNode); + +// stop a raft node +int RaftStop(RaftNode* pNode); + +// client apply a cmd in buf +typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result); + +int RaftApply(RaftNode *pNode, + const RaftBuffer *pBuf, + RaftApplyFp applyCb); + +// recv data from other servers in cluster,buf will be free in raft +int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf); + +// change cluster servers API +typedef void (*RaftChangeFp)(const RaftServer* pServer, int result); + +int RaftAddServer(RaftNode *pNode, + const RaftServer* pServer, + RaftChangeFp changeCb); + +int RaftRemoveServer(RaftNode *pNode, + const RaftServer* pServer, + RaftChangeFp changeCb); + +// transfer leader to id +typedef void (*RaftTransferFp)(RaftId id, int result); +int RaftTransfer(RaftNode *pNode, + RaftId id, + RaftTransferFp transferCb); + #ifdef __cplusplus } #endif -#endif /*_TD_RAFT_H_*/ \ No newline at end of file +#endif /* TD_RAFT_H */ \ No newline at end of file diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index e59d60f7dc2f7e816cd302cfd24be164098d781f..37cd2637839337291a99bb638f47bb0b33a9f54a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -60,8 +60,8 @@ void walClose(twalh); //write int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); void walFsync(twalh, bool forceHint); -//int32_t walCommit(twalh, uint64_t ver); -//int32_t walRollback(twalh, uint64_t ver); +//int32_t walCommit(twalh, int64_t ver); +//int32_t walRollback(twalh, int64_t ver); //read int32_t walRead(twalh, SWalHead **, int64_t ver); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 91688e890df0c699de8f9e0ab4004d5bbd3da182..dd355c838142f129fef8e4ea62a7e2cfababe441 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -25,23 +25,14 @@ extern "C" { typedef struct STQ STQ; STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); -void tqCleanUp(STQ* pTq); - -//create persistent storage for meta info such as consuming offset -//return value > 0: cgId -//return value <= 0: error code -int tqCreateGroup(STQ*); -//create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ*, int cgId); -//destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ*, int cgId); -//delete persistent storage for meta info -int tqDropGroup(STQ*, int cgId); - -int tqPushMsg(STQ*, void *, int64_t version); +void tqCleanUp(STQ*); + +//void* will be replace by a msg type +int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqHandleMsg(STQ*, void *msg); +//void* will be replace by a msg type +int tqHandleMsg(STQ*, void* msg); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 416a915456e78b1746e4a3cc5a4cde5336b42f97..a51f0b03af766533cdbe90948be0e27d978ba276 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -25,6 +25,17 @@ extern "C" { //implement the array index //implement the ring buffer +//create persistent storage for meta info such as consuming offset +//return value > 0: cgId +//return value <= 0: error code +int tqCreateGroup(STQ*); +//create ring buffer in memory and load consuming offset +int tqOpenGroup(STQ*, int cgId); +//destroy ring buffer and persist consuming offset +int tqCloseGroup(STQ*, int cgId); +//delete persistent storage for meta info +int tqDropGroup(STQ*, int cgId); + #ifdef __cplusplus } #endif diff --git a/src/raft/CMakeLists.txt b/src/raft/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..3dc66010383ed97ec4c0c5b28170158e5b031a1d --- /dev/null +++ b/src/raft/CMakeLists.txt @@ -0,0 +1,7 @@ +aux_source_directory(source RAFT_SRC) +add_library(raft ${RAFT_SRC}) +target_include_directories( + raft + PUBLIC "${CMAKE_SOURCE_DIR}/include/raft" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include" +) \ No newline at end of file