diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0ec741ec3e2ab30f7079ae6c7b266b65406c714b..a619e66622dfc61b4d8ba504d53e60313b0ef806 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -21,7 +21,9 @@ extern "C" { #endif #include +#include #include "taosdef.h" +#include "trpc.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -34,23 +36,23 @@ typedef enum { TAOS_SYNC_STATE_LEADER = 2, } ESyncState; -typedef struct { +typedef struct SSyncBuffer { void* data; size_t len; } SSyncBuffer; -typedef struct { - SyncNodeId nodeId; - uint16_t nodePort; // node sync Port - char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN +typedef struct SNodeInfo { + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; -typedef struct { +typedef struct SSyncCfg { int32_t replicaNum; + int32_t myIndex; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCfg; -typedef struct { +typedef struct SNodesRole { int32_t replicaNum; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; ESyncState role[TSDB_MAX_REPLICA]; @@ -128,12 +130,12 @@ typedef struct SStateMgr { } SStateMgr; -typedef struct { - SyncGroupId vgId; - SSyncCfg syncCfg; - SSyncLogStore logStore; - SStateMgr stateManager; - SSyncFSM syncFsm; +typedef struct SSyncInfo { + SyncGroupId vgId; + SSyncCfg syncCfg; + char path[TSDB_FILENAME_LEN]; + SSyncFSM* pFsm; + int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncInfo; diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index 37ee5194c81d337b68e92df096c42a4721ec93eb..cb38d7e3637a4cf11dff6ddc5d3e91354ad81587 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -13,4 +13,8 @@ target_include_directories( sync PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file +) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 9ca0de19c5fbecb6476d281658a4a5e2f5980451..b7c1c051cc3301e40c2024ee8dfee52fd941b075 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 8b5cbf1da53c9ff29e925eafe5ae912a9e28d396..22f8eb464fecb97d93ef45798f4d79fe7e4114db 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 34dfdb3d0952f1e4ce996223393ade44f787f4a3..7e9e637854f29f35650761e4411357099396d9d4 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h new file mode 100644 index 0000000000000000000000000000000000000000..f1c4327b692dfbf1c9066d10894dc827196047fd --- /dev/null +++ b/source/libs/sync/inc/syncEnv.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_ENV_H +#define _TD_LIBS_SYNC_ENV_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" +#include "trpc.h" + +typedef struct SSyncEnv { + void *pTimer; + void *pTimerManager; +} SSyncEnv; + +int32_t syncEnvStart(); + +int32_t syncEnvStop(); + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_ENV_H*/ diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h new file mode 100644 index 0000000000000000000000000000000000000000..4b788efd797fc3b722742da9ae4360496dc2e0b8 --- /dev/null +++ b/source/libs/sync/inc/syncIO.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_IO_H +#define _TD_LIBS_IO_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "os.h" +#include "syncInt.h" +#include "taosdef.h" +#include "tqueue.h" +#include "trpc.h" + +typedef struct SSyncIO { + void * serverRpc; + void * clientRpc; + STaosQueue *pMsgQ; + STaosQset * pQset; + pthread_t tid; + int8_t isStart; + + SEpSet epSet; + + void *syncTimer; + void *syncTimerManager; + + int32_t (*start)(struct SSyncIO *ths); + int32_t (*stop)(struct SSyncIO *ths); + int32_t (*ping)(struct SSyncIO *ths); + + int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); + int32_t (*destroy)(struct SSyncIO *ths); + + void *pSyncNode; + int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg); + +} SSyncIO; + +extern SSyncIO *gSyncIO; + +int32_t syncIOStart(); +int32_t syncIOStop(); +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); +SSyncIO *syncIOCreate(); + +static int32_t doSyncIOStart(SSyncIO *io); +static int32_t doSyncIOStop(SSyncIO *io); +static int32_t doSyncIOPing(SSyncIO *io); +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t doSyncIODestroy(SSyncIO *io); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_IO_H*/ diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 551ce83122c519190be0b494bc76d2fafe72b7e6..ad8484662aea70ebb262582e9f5b637af19384eb 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -23,7 +23,11 @@ extern "C" { #include #include #include +#include "sync.h" #include "taosdef.h" +#include "tlog.h" + +extern int32_t sDebugFlag; #define sFatal(...) \ { \ @@ -62,16 +66,81 @@ extern "C" { } \ } +struct SRaft; +typedef struct SRaft SRaft; + +struct SyncPing; +typedef struct SyncPing SyncPing; + +struct SyncPingReply; +typedef struct SyncPingReply SyncPingReply; + +struct SyncRequestVote; +typedef struct SyncRequestVote SyncRequestVote; + +struct SyncRequestVoteReply; +typedef struct SyncRequestVoteReply SyncRequestVoteReply; + +struct SyncAppendEntries; +typedef struct SyncAppendEntries SyncAppendEntries; + +struct SyncAppendEntriesReply; +typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; + typedef struct SSyncNode { - char path[TSDB_FILENAME_LEN]; - int8_t replica; - int8_t quorum; - int8_t selfIndex; - uint32_t vgId; - int32_t refCount; - int64_t rid; + int8_t replica; + int8_t quorum; + + SyncGroupId vgId; + SSyncCfg syncCfg; + char path[TSDB_FILENAME_LEN]; + + SRaft* pRaft; + + int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg); + + int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); + + int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); + + int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg); + + int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); + + int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + + int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg); + + int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + + int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + } SSyncNode; +SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); + +void syncNodeClose(SSyncNode* pSyncNode); + +static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); + +static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); + +static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); + +static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); + +static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); + +static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + +static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); + +static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); + +static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index f410c8cf6ebf8f71b4585a72568afc88f80ac15d..2ee5e0109c447f37a87eff474d2255cb33f1bde7 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -41,26 +41,26 @@ typedef enum ESyncMessageType { typedef struct SyncPing { ESyncMessageType msgType; const SSyncBuffer *pData; -} SyncPing; +} SyncPing, RaftPing; typedef struct SyncPingReply { ESyncMessageType msgType; const SSyncBuffer *pData; -} SyncPingReply; +} SyncPingReply, RaftPingReply; typedef struct SyncClientRequest { ESyncMessageType msgType; const SSyncBuffer *pData; int64_t seqNum; bool isWeak; -} SyncClientRequest; +} SyncClientRequest, RaftClientRequest; typedef struct SyncClientRequestReply { ESyncMessageType msgType; int32_t errCode; const SSyncBuffer *pErrMsg; const SSyncBuffer *pLeaderHint; -} SyncClientRequestReply; +} SyncClientRequestReply, RaftClientRequestReply; typedef struct SyncRequestVote { ESyncMessageType msgType; @@ -69,7 +69,7 @@ typedef struct SyncRequestVote { SyncGroupId vgId; SyncIndex lastLogIndex; SyncTerm lastLogTerm; -} SyncRequestVote; +} SyncRequestVote, RaftRequestVote; typedef struct SyncRequestVoteReply { ESyncMessageType msgType; @@ -77,7 +77,7 @@ typedef struct SyncRequestVoteReply { SyncNodeId nodeId; SyncGroupId vgId; bool voteGranted; -} SyncRequestVoteReply; +} SyncRequestVoteReply, RaftRequestVoteReply; typedef struct SyncAppendEntries { ESyncMessageType msgType; @@ -88,7 +88,7 @@ typedef struct SyncAppendEntries { int32_t entryCount; SSyncRaftEntry * logEntries; SyncIndex commitIndex; -} SyncAppendEntries; +} SyncAppendEntries, RaftAppendEntries; typedef struct SyncAppendEntriesReply { ESyncMessageType msgType; @@ -96,7 +96,7 @@ typedef struct SyncAppendEntriesReply { SyncNodeId nodeId; bool success; SyncIndex matchIndex; -} SyncAppendEntriesReply; +} SyncAppendEntriesReply, RaftAppendEntriesReply; #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index 0c7e573572230e640094f734ca716886c26ac40c..a247a29fc4dbfe6050433e29c0dbe84483f0e04a 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -28,14 +28,56 @@ extern "C" { #include "taosdef.h" typedef struct SRaftId { - SyncNodeId nodeId; + SyncNodeId addr; SyncGroupId vgId; } SRaftId; typedef struct SRaft { - SRaftId id; + SRaftId id; + SSyncFSM* pFsm; + + int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); + + int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg); + + int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg); + + int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg); + + int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg); + + int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg); + + int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg); + + int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg); + } SRaft; +SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm); + +void raftClose(SRaft* pRaft); + +static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); + +static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg); + +static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg); + +static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg); + +static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg); + +static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg); + +static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg); + +static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg); + +static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg); + int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index adc82f2c5d9f2db8d7f1cd3f734b7bb129f6173a..516bef4d48c4f9b26d202f6999c1eec78644b056 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_LIBS_TPL_H -#define _TD_LIBS_TPL_H +#ifndef _TD_LIBS_SYNC_RAFT_ENTRY_H +#define _TD_LIBS_SYNC_RAFT_ENTRY_H #ifdef __cplusplus extern "C" { @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "taosdef.h" typedef struct SSyncRaftEntry { @@ -37,4 +37,4 @@ typedef struct SSyncRaftEntry { } #endif -#endif /*_TD_LIBS_TPL_H*/ +#endif /*_TD_LIBS_SYNC_RAFT_ENTRY_H*/ diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 8c4b5116ea17b2c1c57d0156a165d3bab845594e..ee971062cf7ed5140856f0c5d4483adf157d9d0a 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "taosdef.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 4cb852f34a5d962824a91e8b7ca666bcb1f130c0..0fdbd7a15084193ec5deef4bc1ae2b7ff008c71a 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -23,17 +23,36 @@ extern "C" { #include #include #include -#include "sync.h" +#include "cJSON.h" +#include "syncInt.h" #include "syncRaft.h" #include "taosdef.h" -int32_t currentTerm(SyncTerm *pCurrentTerm); +#define RAFT_STORE_BLOCK_SIZE 512 +#define RAFT_STORE_PATH_LEN 128 -int32_t persistCurrentTerm(SyncTerm currentTerm); +typedef struct SRaftStore { + SyncTerm currentTerm; + SRaftId voteFor; + //FileFd fd; + char path[RAFT_STORE_PATH_LEN]; +} SRaftStore; -int32_t voteFor(SRaftId *pRaftId); +SRaftStore *raftStoreOpen(const char *path); -int32_t persistVoteFor(SRaftId *pRaftId); +static int32_t raftStoreInit(SRaftStore *pRaftStore); + +int32_t raftStoreClose(SRaftStore *pRaftStore); + +int32_t raftStorePersist(SRaftStore *pRaftStore); + +static bool raftStoreFileExist(char *path); + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); + +void raftStorePrint(SRaftStore *pRaftStore); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 40c5ff790b6c30ee141633cabd3d730275a026b8..7f97ae9e49cb516dbbc04f082a4068165208bd6b 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 3ff96bbe8fad9f80db711e556ec841f7b6dd3082..c2eca55151e24aceae235e7232faf03bb5784894 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 033ac89bc2f4d0f74afd4a497e77a098b5934159..38068dd0e2db38750a352b4546cf19c49e5eb09b 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 3b6121578ad99b55af5bc81bf9601a48b1391c68..89fcb230fbe06f83a44cdd178bece09f80c5b587 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 8159d2566c740296ca58f6d1f8ba05d12d044a34..d9d6a1793917cd2e52275bc074d05523438a6f6b 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h new file mode 100644 index 0000000000000000000000000000000000000000..cfcf58bee2199735ed6058d563012a6e1e364248 --- /dev/null +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_VOTG_MGR_H +#define _TD_LIBS_SYNC_VOTG_MGR_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_VOTG_MGR_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1286108664270b81a4b5ae70f6e592dc67a6a5ad..2b9c59ec92a3b368426af4000b93eb6db1858727 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -14,104 +14,98 @@ */ #include "syncAppendEntries.h" -#include "sync.h" void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { - -// TLA+ Spec -//AppendEntries(i, j) == -// /\ i /= j -// /\ state[i] = Leader -// /\ LET prevLogIndex == nextIndex[i][j] - 1 -// prevLogTerm == IF prevLogIndex > 0 THEN -// log[i][prevLogIndex].term -// ELSE -// 0 -// \* Send up to 1 entry, constrained by the end of the log. -// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) -// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) -// IN Send([mtype |-> AppendEntriesRequest, -// mterm |-> currentTerm[i], -// mprevLogIndex |-> prevLogIndex, -// mprevLogTerm |-> prevLogTerm, -// mentries |-> entries, -// \* mlog is used as a history variable for the proof. -// \* It would not exist in a real implementation. -// mlog |-> log[i], -// mcommitIndex |-> Min({commitIndex[i], lastEntry}), -// msource |-> i, -// mdest |-> j]) -// /\ UNCHANGED <> - + // TLA+ Spec + // AppendEntries(i, j) == + // /\ i /= j + // /\ state[i] = Leader + // /\ LET prevLogIndex == nextIndex[i][j] - 1 + // prevLogTerm == IF prevLogIndex > 0 THEN + // log[i][prevLogIndex].term + // ELSE + // 0 + // \* Send up to 1 entry, constrained by the end of the log. + // lastEntry == Min({Len(log[i]), nextIndex[i][j]}) + // entries == SubSeq(log[i], nextIndex[i][j], lastEntry) + // IN Send([mtype |-> AppendEntriesRequest, + // mterm |-> currentTerm[i], + // mprevLogIndex |-> prevLogIndex, + // mprevLogTerm |-> prevLogTerm, + // mentries |-> entries, + // \* mlog is used as a history variable for the proof. + // \* It would not exist in a real implementation. + // mlog |-> log[i], + // mcommitIndex |-> Min({commitIndex[i], lastEntry}), + // msource |-> i, + // mdest |-> j]) + // /\ UNCHANGED <> } void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { - -// TLA+ Spec -//HandleAppendEntriesRequest(i, j, m) == -// LET logOk == \/ m.mprevLogIndex = 0 -// \/ /\ m.mprevLogIndex > 0 -// /\ m.mprevLogIndex <= Len(log[i]) -// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term -// IN /\ m.mterm <= currentTerm[i] -// /\ \/ /\ \* reject request -// \/ m.mterm < currentTerm[i] -// \/ /\ m.mterm = currentTerm[i] -// /\ state[i] = Follower -// /\ \lnot logOk -// /\ Reply([mtype |-> AppendEntriesResponse, -// mterm |-> currentTerm[i], -// msuccess |-> FALSE, -// mmatchIndex |-> 0, -// msource |-> i, -// mdest |-> j], -// m) -// /\ UNCHANGED <> -// \/ \* return to follower state -// /\ m.mterm = currentTerm[i] -// /\ state[i] = Candidate -// /\ state' = [state EXCEPT ![i] = Follower] -// /\ UNCHANGED <> -// \/ \* accept request -// /\ m.mterm = currentTerm[i] -// /\ state[i] = Follower -// /\ logOk -// /\ LET index == m.mprevLogIndex + 1 -// IN \/ \* already done with request -// /\ \/ m.mentries = << >> -// \/ /\ m.mentries /= << >> -// /\ Len(log[i]) >= index -// /\ log[i][index].term = m.mentries[1].term -// \* This could make our commitIndex decrease (for -// \* example if we process an old, duplicated request), -// \* but that doesn't really affect anything. -// /\ commitIndex' = [commitIndex EXCEPT ![i] = -// m.mcommitIndex] -// /\ Reply([mtype |-> AppendEntriesResponse, -// mterm |-> currentTerm[i], -// msuccess |-> TRUE, -// mmatchIndex |-> m.mprevLogIndex + -// Len(m.mentries), -// msource |-> i, -// mdest |-> j], -// m) -// /\ UNCHANGED <> -// \/ \* conflict: remove 1 entry -// /\ m.mentries /= << >> -// /\ Len(log[i]) >= index -// /\ log[i][index].term /= m.mentries[1].term -// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> -// log[i][index2]] -// IN log' = [log EXCEPT ![i] = new] -// /\ UNCHANGED <> -// \/ \* no conflict: append entry -// /\ m.mentries /= << >> -// /\ Len(log[i]) = m.mprevLogIndex -// /\ log' = [log EXCEPT ![i] = -// Append(log[i], m.mentries[1])] -// /\ UNCHANGED <> -// /\ UNCHANGED <> -// - - + // TLA+ Spec + // HandleAppendEntriesRequest(i, j, m) == + // LET logOk == \/ m.mprevLogIndex = 0 + // \/ /\ m.mprevLogIndex > 0 + // /\ m.mprevLogIndex <= Len(log[i]) + // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term + // IN /\ m.mterm <= currentTerm[i] + // /\ \/ /\ \* reject request + // \/ m.mterm < currentTerm[i] + // \/ /\ m.mterm = currentTerm[i] + // /\ state[i] = Follower + // /\ \lnot logOk + // /\ Reply([mtype |-> AppendEntriesResponse, + // mterm |-> currentTerm[i], + // msuccess |-> FALSE, + // mmatchIndex |-> 0, + // msource |-> i, + // mdest |-> j], + // m) + // /\ UNCHANGED <> + // \/ \* return to follower state + // /\ m.mterm = currentTerm[i] + // /\ state[i] = Candidate + // /\ state' = [state EXCEPT ![i] = Follower] + // /\ UNCHANGED <> + // \/ \* accept request + // /\ m.mterm = currentTerm[i] + // /\ state[i] = Follower + // /\ logOk + // /\ LET index == m.mprevLogIndex + 1 + // IN \/ \* already done with request + // /\ \/ m.mentries = << >> + // \/ /\ m.mentries /= << >> + // /\ Len(log[i]) >= index + // /\ log[i][index].term = m.mentries[1].term + // \* This could make our commitIndex decrease (for + // \* example if we process an old, duplicated request), + // \* but that doesn't really affect anything. + // /\ commitIndex' = [commitIndex EXCEPT ![i] = + // m.mcommitIndex] + // /\ Reply([mtype |-> AppendEntriesResponse, + // mterm |-> currentTerm[i], + // msuccess |-> TRUE, + // mmatchIndex |-> m.mprevLogIndex + + // Len(m.mentries), + // msource |-> i, + // mdest |-> j], + // m) + // /\ UNCHANGED <> + // \/ \* conflict: remove 1 entry + // /\ m.mentries /= << >> + // /\ Len(log[i]) >= index + // /\ log[i][index].term /= m.mentries[1].term + // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> + // log[i][index2]] + // IN log' = [log EXCEPT ![i] = new] + // /\ UNCHANGED <> + // \/ \* no conflict: append entry + // /\ m.mentries /= << >> + // /\ Len(log[i]) = m.mprevLogIndex + // /\ log' = [log EXCEPT ![i] = + // Append(log[i], m.mentries[1])] + // /\ UNCHANGED <> + // /\ UNCHANGED <> + // } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 4a9055e172fce1aad4753410301c34f4c8823275..05734237b93de6ec43e85116e0d65640f0ccd942 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,21 +14,18 @@ */ #include "syncAppendEntriesReply.h" -#include "sync.h" void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { - -// TLA+ Spec -//HandleAppendEntriesResponse(i, j, m) == -// /\ m.mterm = currentTerm[i] -// /\ \/ /\ m.msuccess \* successful -// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] -// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] -// \/ /\ \lnot m.msuccess \* not successful -// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = -// Max({nextIndex[i][j] - 1, 1})] -// /\ UNCHANGED <> -// /\ Discard(m) -// /\ UNCHANGED <> - + // TLA+ Spec + // HandleAppendEntriesResponse(i, j, m) == + // /\ m.mterm = currentTerm[i] + // /\ \/ /\ m.msuccess \* successful + // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] + // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] + // \/ /\ \lnot m.msuccess \* not successful + // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = + // Max({nextIndex[i][j] - 1, 1})] + // /\ UNCHANGED <> + // /\ Discard(m) + // /\ UNCHANGED <> } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 738fc4c5e129562d368451c3f9d1eb3315060bb4..329105e2a1214aba3cfb1bce2850f292f85811b8 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncElection.h" diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c new file mode 100644 index 0000000000000000000000000000000000000000..e71cf55cb193c4716bba452f1e5601a64b559fad --- /dev/null +++ b/source/libs/sync/src/syncEnv.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncEnv.h" +#include + +SSyncEnv *gSyncEnv = NULL; + +int32_t syncEnvStart() { + int32_t ret; + gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); + assert(gSyncEnv != NULL); + ret = doSyncEnvStart(gSyncEnv); + return ret; +} + +int32_t syncEnvStop() { + int32_t ret = doSyncEnvStop(gSyncEnv); + return ret; +} + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; } + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c new file mode 100644 index 0000000000000000000000000000000000000000..bb20d11e37b3edc56f2958a5021f5881129e220d --- /dev/null +++ b/source/libs/sync/src/syncIO.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncIO.h" +#include +#include "syncOnMessage.h" +#include "tglobal.h" +#include "ttimer.h" +#include "tutil.h" + +SSyncIO *gSyncIO = NULL; + +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } + +int32_t syncIOStart() { return 0; } + +int32_t syncIOStop() { return 0; } + +static void syncTick(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sDebug("syncTick ... "); + + SRpcMsg rpcMsg; + rpcMsg.pCont = rpcMallocCont(10); + snprintf(rpcMsg.pCont, 10, "TICK"); + rpcMsg.contLen = 10; + rpcMsg.handle = NULL; + rpcMsg.msgType = 2; + + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); + + io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); +} + +void *syncConsumer(void *param) { + SSyncIO *io = param; + + STaosQall *qall; + SRpcMsg * pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); + + while (1) { + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + sDebug("%d sync-io msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; + + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); + } + + taosResetQitems(qall); + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + rpcFreeCont(pRpcMsg->pCont); + + if (pRpcMsg->handle != NULL) { + int msgSize = 128; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + } + + taosFreeQitem(pRpcMsg); + } + } + + taosFreeQall(qall); + return NULL; +} + +static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + return ret; +} + +static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sDebug("processResponse ... "); + rpcFreeCont(pMsg->pCont); +} + +static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SSyncIO *io = pParent; + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); + taosWriteQitem(io->pMsgQ, pTemp); +} + +SSyncIO *syncIOCreate() { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); + + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); + + io->start = doSyncIOStart; + io->stop = doSyncIOStop; + io->ping = doSyncIOPing; + io->onMsg = doSyncIOOnMsg; + io->destroy = doSyncIODestroy; + + return io; +} + +static int32_t doSyncIOStart(SSyncIO *io) { + taosBlockSIGPIPE(); + + tsRpcForceTcp = 1; + + // cient rpc init + { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "SYNC-IO-CLIENT"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processResponse; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = "sync-io"; + rpcInit.secret = "sync-io"; + rpcInit.ckey = "key"; + rpcInit.spi = 0; + rpcInit.connType = TAOS_CONN_CLIENT; + + io->clientRpc = rpcOpen(&rpcInit); + if (io->clientRpc == NULL) { + sError("failed to initialize RPC"); + return -1; + } + } + + // server rpc init + { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 38000; + rpcInit.label = "SYNC-IO-SERVER"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processRequestMsg; + rpcInit.sessions = 1000; + rpcInit.idleTime = 2 * 1500; + rpcInit.afp = retrieveAuthInfo; + rpcInit.parent = io; + rpcInit.connType = TAOS_CONN_SERVER; + + void *pRpc = rpcOpen(&rpcInit); + if (pRpc == NULL) { + sError("failed to start RPC server"); + return -1; + } + } + + io->epSet.inUse = 0; + addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); + + // start consumer thread + { + if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { + sError("failed to create sync consumer thread since %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + // start tmr thread + io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + + return 0; +} + +static int32_t doSyncIOStop(SSyncIO *io) { + atomic_store_8(&io->isStart, 0); + pthread_join(io->tid, NULL); + return 0; +} + +static int32_t doSyncIOPing(SSyncIO *io) { + SRpcMsg rpcMsg, rspMsg; + + rpcMsg.pCont = rpcMallocCont(10); + snprintf(rpcMsg.pCont, 10, "ping"); + rpcMsg.contLen = 10; + rpcMsg.handle = NULL; + rpcMsg.msgType = 1; + + rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); + + return 0; +} + +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } + +static int32_t doSyncIODestroy(SSyncIO *io) { + int8_t start = atomic_load_8(&io->isStart); + assert(start == 0); + + if (io->serverRpc != NULL) { + free(io->serverRpc); + io->serverRpc = NULL; + } + + if (io->clientRpc != NULL) { + free(io->clientRpc); + io->clientRpc = NULL; + } + + if (io->pMsgQ != NULL) { + free(io->pMsgQ); + io->pMsgQ = NULL; + } + + if (io->pQset != NULL) { + free(io->pQset); + io->pQset = NULL; + } + + return 0; +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbb969eb1c91b6d0641dd1ff8d02b49f4ebe428c..bd2952505ed1e28b36da0333dc31e97eae3ae4eb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -16,12 +16,17 @@ #include #include "sync.h" #include "syncInt.h" +#include "syncRaft.h" int32_t syncInit() { return 0; } void syncCleanUp() {} -int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } +int64_t syncStart(const SSyncInfo* pSyncInfo) { + SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); + assert(pSyncNode != NULL); + return 0; +} void syncStop(int64_t rid) {} @@ -31,4 +36,74 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} \ No newline at end of file +void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} + +SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { + SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); + assert(pSyncNode != NULL); + + pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + + pSyncNode->FpPing = doSyncNodePing; + pSyncNode->FpOnPing = onSyncNodePing; + pSyncNode->FpOnPingReply = onSyncNodePingReply; + pSyncNode->FpRequestVote = doSyncNodeRequestVote; + pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; + pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; + pSyncNode->FpAppendEntries = doSyncNodeAppendEntries; + pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; + pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; + + return pSyncNode; +} + +void syncNodeClose(SSyncNode* pSyncNode) { + assert(pSyncNode != NULL); + raftClose(pSyncNode->pRaft); + free(pSyncNode); +} + +static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { + int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { + int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { + int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg); + return ret; +} + +static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { + int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg); + return ret; +} + +static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { + int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg); + return ret; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index dcfc940f76b37570f6052ef47bf15aa87dbb698f..89373037250aaf22d0c3f0ef06d8f9c27409d59c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,7 +14,6 @@ */ #include "syncMessage.h" -#include "sync.h" #include "syncRaft.h" void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncOnMessage.c b/source/libs/sync/src/syncOnMessage.c index 738fc4c5e129562d368451c3f9d1eb3315060bb4..19a97ee1566d32ed0fc8786c4d75542c2c435f30 100644 --- a/source/libs/sync/src/syncOnMessage.c +++ b/source/libs/sync/src/syncOnMessage.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncOnMessage.h" diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index 85c2c6fe27ba70b4ecdac85c5d31aac165ce0a00..9f139730d102f4d6ab414938042711af996d65df 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -16,6 +16,51 @@ #include "syncRaft.h" #include "sync.h" +SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { + SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); + assert(pRaft != NULL); + + pRaft->id = raftId; + pRaft->pFsm = pFsm; + + pRaft->FpPing = doRaftPing; + pRaft->FpOnPing = onRaftPing; + pRaft->FpOnPingReply = onRaftPingReply; + + pRaft->FpRequestVote = doRaftRequestVote; + pRaft->FpOnRequestVote = onRaftRequestVote; + pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply; + + pRaft->FpAppendEntries = doRaftAppendEntries; + pRaft->FpOnAppendEntries = onRaftAppendEntries; + pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply; + + return pRaft; +} + +void raftClose(SRaft* pRaft) { + assert(pRaft != NULL); + free(pRaft); +} + +static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } + +static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } + +static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; } + +static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; } + +static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; } + +static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; } + +static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; } + +static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; } + +static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; } + int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 738fc4c5e129562d368451c3f9d1eb3315060bb4..e525d3c7c26366aa5f64eb596ee0ad3e92ad9340 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncRaftEntry.h" diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 4a5fc201b07c01cf537f809064ad10618115b4d6..37bb3ce48cc5f7651a8c83d6457da12f11e9d42f 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,7 +14,6 @@ */ #include "syncRaftLog.h" -#include "sync.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index d45e53132cbc6d14f70f2c9d09948b9f09d283ce..964cc784905e5f22171b50413f9f78fc14a328de 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -14,12 +14,142 @@ */ #include "syncRaftStore.h" -#include "sync.h" +#include "cJSON.h" -int32_t currentTerm(SyncTerm *pCurrentTerm) { return 0; } +// to complie success: FileIO interface is modified -int32_t persistCurrentTerm(SyncTerm currentTerm) { return 0; } +SRaftStore *raftStoreOpen(const char *path) { return NULL;} -int32_t voteFor(SRaftId *pRaftId) { return 0; } +static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;} -int32_t persistVoteFor(SRaftId *pRaftId) { return 0; } \ No newline at end of file +int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;} + +int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;} + +static bool raftStoreFileExist(char *path) { return 0;} + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + +void raftStorePrint(SRaftStore *pRaftStore) {} + + + +#if 0 + +SRaftStore *raftStoreOpen(const char *path) { + int32_t ret; + + SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); + if (pRaftStore == NULL) { + sError("raftStoreOpen malloc error"); + return NULL; + } + memset(pRaftStore, 0, sizeof(*pRaftStore)); + snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); + + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + memset(storeBuf, 0, sizeof(storeBuf)); + + if (!raftStoreFileExist(pRaftStore->path)) { + ret = raftStoreInit(pRaftStore); + assert(ret == 0); + } + + pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path); + if (pRaftStore->fd < 0) { + return NULL; + } + + int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); + assert(len == RAFT_STORE_BLOCK_SIZE); + + ret = raftStoreDeserialize(pRaftStore, storeBuf, len); + assert(ret == 0); + + return pRaftStore; +} + +static int32_t raftStoreInit(SRaftStore *pRaftStore) { + pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path); + if (pRaftStore->fd < 0) { + return -1; + } + + pRaftStore->currentTerm = 0; + pRaftStore->voteFor.addr = 0; + pRaftStore->voteFor.vgId = 0; + + int32_t ret = raftStorePersist(pRaftStore); + assert(ret == 0); + + taosCloseFile(pRaftStore->fd); + return 0; +} + +int32_t raftStoreClose(SRaftStore *pRaftStore) { + taosCloseFile(pRaftStore->fd); + free(pRaftStore); + return 0; +} + +int32_t raftStorePersist(SRaftStore *pRaftStore) { + int32_t ret; + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + + ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + assert(ret == 0); + + taosLSeekFile(pRaftStore->fd, 0, SEEK_SET); + + ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); + assert(ret == RAFT_STORE_BLOCK_SIZE); + + fsync(pRaftStore->fd); + return 0; +} + +static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { + cJSON *pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); + cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); + cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); + + char *serialized = cJSON_Print(pRoot); + int len2 = strlen(serialized); + assert(len2 < len); + memset(buf, 0, len); + snprintf(buf, len, "%s", serialized); + free(serialized); + + cJSON_Delete(pRoot); + return 0; +} + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { + assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); + cJSON *pRoot = cJSON_Parse(buf); + + cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); + pRaftStore->currentTerm = pCurrentTerm->valueint; + + cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); + pRaftStore->voteFor.addr = pVoteForAddr->valueint; + + cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); + pRaftStore->voteFor.vgId = pVoteForVgid->valueint; + + cJSON_Delete(pRoot); + return 0; +} + +void raftStorePrint(SRaftStore *pRaftStore) { + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + printf("%s\n", storeBuf); +} + +#endif diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 738fc4c5e129562d368451c3f9d1eb3315060bb4..4cea7c150e19b490910adf771cfca050c03aea22 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncReplication.h" diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index c31ec0f34d521b214b7d45a3ce32c3890ebcabd5..7aee47b8e42ced91522c177d887785a6193328af 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -14,46 +14,41 @@ */ #include "syncRequestVote.h" -#include "sync.h" void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { - -// TLA+ Spec -//RequestVote(i, j) == -// /\ state[i] = Candidate -// /\ j \notin votesResponded[i] -// /\ Send([mtype |-> RequestVoteRequest, -// mterm |-> currentTerm[i], -// mlastLogTerm |-> LastTerm(log[i]), -// mlastLogIndex |-> Len(log[i]), -// msource |-> i, -// mdest |-> j]) -// /\ UNCHANGED <> - + // TLA+ Spec + // RequestVote(i, j) == + // /\ state[i] = Candidate + // /\ j \notin votesResponded[i] + // /\ Send([mtype |-> RequestVoteRequest, + // mterm |-> currentTerm[i], + // mlastLogTerm |-> LastTerm(log[i]), + // mlastLogIndex |-> Len(log[i]), + // msource |-> i, + // mdest |-> j]) + // /\ UNCHANGED <> } void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { - -// TLA+ Spec -//HandleRequestVoteRequest(i, j, m) == -// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) -// \/ /\ m.mlastLogTerm = LastTerm(log[i]) -// /\ m.mlastLogIndex >= Len(log[i]) -// grant == /\ m.mterm = currentTerm[i] -// /\ logOk -// /\ votedFor[i] \in {Nil, j} -// IN /\ m.mterm <= currentTerm[i] -// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] -// \/ ~grant /\ UNCHANGED votedFor -// /\ Reply([mtype |-> RequestVoteResponse, -// mterm |-> currentTerm[i], -// mvoteGranted |-> grant, -// \* mlog is used just for the `elections' history variable for -// \* the proof. It would not exist in a real implementation. -// mlog |-> log[i], -// msource |-> i, -// mdest |-> j], -// m) -// /\ UNCHANGED <> - + // TLA+ Spec + // HandleRequestVoteRequest(i, j, m) == + // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) + // \/ /\ m.mlastLogTerm = LastTerm(log[i]) + // /\ m.mlastLogIndex >= Len(log[i]) + // grant == /\ m.mterm = currentTerm[i] + // /\ logOk + // /\ votedFor[i] \in {Nil, j} + // IN /\ m.mterm <= currentTerm[i] + // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] + // \/ ~grant /\ UNCHANGED votedFor + // /\ Reply([mtype |-> RequestVoteResponse, + // mterm |-> currentTerm[i], + // mvoteGranted |-> grant, + // \* mlog is used just for the `elections' history variable for + // \* the proof. It would not exist in a real implementation. + // mlog |-> log[i], + // msource |-> i, + // mdest |-> j], + // m) + // /\ UNCHANGED <> } diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index ba9787f00c156a7b205a87108dd5670ff0f921b3..a9c88a797545cc207d94c338ef9bc0f86f77e5c4 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -14,25 +14,22 @@ */ #include "syncRequestVoteReply.h" -#include "sync.h" void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { - -// TLA+ Spec -//HandleRequestVoteResponse(i, j, m) == -// \* This tallies votes even when the current state is not Candidate, but -// \* they won't be looked at, so it doesn't matter. -// /\ m.mterm = currentTerm[i] -// /\ votesResponded' = [votesResponded EXCEPT ![i] = -// votesResponded[i] \cup {j}] -// /\ \/ /\ m.mvoteGranted -// /\ votesGranted' = [votesGranted EXCEPT ![i] = -// votesGranted[i] \cup {j}] -// /\ voterLog' = [voterLog EXCEPT ![i] = -// voterLog[i] @@ (j :> m.mlog)] -// \/ /\ ~m.mvoteGranted -// /\ UNCHANGED <> -// /\ Discard(m) -// /\ UNCHANGED <> - + // TLA+ Spec + // HandleRequestVoteResponse(i, j, m) == + // \* This tallies votes even when the current state is not Candidate, but + // \* they won't be looked at, so it doesn't matter. + // /\ m.mterm = currentTerm[i] + // /\ votesResponded' = [votesResponded EXCEPT ![i] = + // votesResponded[i] \cup {j}] + // /\ \/ /\ m.mvoteGranted + // /\ votesGranted' = [votesGranted EXCEPT ![i] = + // votesGranted[i] \cup {j}] + // /\ voterLog' = [voterLog EXCEPT ![i] = + // voterLog[i] @@ (j :> m.mlog)] + // \/ /\ ~m.mvoteGranted + // /\ UNCHANGED <> + // /\ Discard(m) + // /\ UNCHANGED <> } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8a27f097d1335a16d3033d091ba6e5221029a4b8..da194780ffdf17d5b511a2592b736aa927263b18 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,7 +14,6 @@ */ #include "syncSnapshot.h" -#include "sync.h" #include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 206dd70046b07f5597b96e819d8fd10ceee59433..e27df55d0712d16c6d396f7bb7f0acd6daee30c1 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -14,6 +14,5 @@ */ #include "syncTimeout.h" -#include "sync.h" void onTimeout(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c new file mode 100644 index 0000000000000000000000000000000000000000..02cf4ac03365eb8cedb49eff0b1ba1de08f4699f --- /dev/null +++ b/source/libs/sync/src/syncVoteMgr.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncVoteMgr.h" diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e655ac01be5cef4af5ede6e5d2735bf67eb123c3 --- /dev/null +++ b/source/libs/sync/test/CMakeLists.txt @@ -0,0 +1,55 @@ +add_executable(syncTest "") +add_executable(syncEnvTest "") +add_executable(syncPingTest "") + + +target_sources(syncTest + PRIVATE + "syncTest.cpp" +) +target_sources(syncEnvTest + PRIVATE + "syncEnvTest.cpp" +) +target_sources(syncPingTest + PRIVATE + "syncPingTest.cpp" +) + + +target_include_directories(syncTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncEnvTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncPingTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + + +target_link_libraries(syncTest + sync + gtest_main +) +target_link_libraries(syncEnvTest + sync + gtest_main +) +target_link_libraries(syncPingTest + sync + gtest_main +) + + +enable_testing() +add_test( + NAME sync_test + COMMAND syncTest +) diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1d050e709410fd9fafdc52151850f0e4e76c0a2b --- /dev/null +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -0,0 +1,56 @@ +#include "syncEnv.h" +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void doSync() { + SSyncInfo syncInfo; + syncInfo.vgId = 1; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); +} + +int main() { + //taosInitLog((char*)"syncEnvTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret = syncIOStart(); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + doSync(); + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e62d0519463df03247faa877d18bd8c7d03217a7 --- /dev/null +++ b/source/libs/sync/test/syncPingTest.cpp @@ -0,0 +1,65 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void doSync() { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = 0; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; +} + +int main() { + //taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret = syncIOStart(); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + doSync(); + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 47566d537e74b4369806f11977220288aff2b00f..035ed5462900e4dc0e632714324812c81c7d924e 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,7 +1,57 @@ #include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" -int main() { - printf("test \n"); - return 0; +void *pingFunc(void *param) { + SSyncIO *io = (SSyncIO *)param; + while (1) { + sDebug("io->ping"); + io->ping(io); + sleep(1); + } + return NULL; } +int main() { + //taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + sTrace("sync log test: trace"); + sDebug("sync log test: debug"); + sInfo("sync log test: info"); + sWarn("sync log test: warn"); + sError("sync log test: error"); + sFatal("sync log test: fatal"); + + SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); + assert(pRaftStore != NULL); + + raftStorePrint(pRaftStore); + + pRaftStore->currentTerm = 100; + pRaftStore->voteFor.addr = 200; + pRaftStore->voteFor.vgId = 300; + + raftStorePrint(pRaftStore); + + raftStorePersist(pRaftStore); + + sDebug("sync test"); + + SSyncIO *syncIO = syncIOCreate(); + assert(syncIO != NULL); + + syncIO->start(syncIO); + + sleep(2); + + pthread_t tid; + pthread_create(&tid, NULL, pingFunc, syncIO); + + while (1) { + sleep(1); + } + return 0; +}