未验证 提交 8dc88e4d 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10430 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
...@@ -21,7 +21,9 @@ extern "C" { ...@@ -21,7 +21,9 @@ extern "C" {
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <tep.h>
#include "taosdef.h" #include "taosdef.h"
#include "trpc.h"
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
...@@ -34,23 +36,23 @@ typedef enum { ...@@ -34,23 +36,23 @@ typedef enum {
TAOS_SYNC_STATE_LEADER = 2, TAOS_SYNC_STATE_LEADER = 2,
} ESyncState; } ESyncState;
typedef struct { typedef struct SSyncBuffer {
void* data; void* data;
size_t len; size_t len;
} SSyncBuffer; } SSyncBuffer;
typedef struct { typedef struct SNodeInfo {
SyncNodeId nodeId; uint16_t nodePort; // node sync Port
uint16_t nodePort; // node sync Port char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct SSyncCfg {
int32_t replicaNum; int32_t replicaNum;
int32_t myIndex;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCfg; } SSyncCfg;
typedef struct { typedef struct SNodesRole {
int32_t replicaNum; int32_t replicaNum;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
ESyncState role[TSDB_MAX_REPLICA]; ESyncState role[TSDB_MAX_REPLICA];
...@@ -128,12 +130,12 @@ typedef struct SStateMgr { ...@@ -128,12 +130,12 @@ typedef struct SStateMgr {
} SStateMgr; } SStateMgr;
typedef struct { typedef struct SSyncInfo {
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SSyncLogStore logStore; char path[TSDB_FILENAME_LEN];
SStateMgr stateManager; SSyncFSM* pFsm;
SSyncFSM syncFsm; int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
......
...@@ -13,4 +13,8 @@ target_include_directories( ...@@ -13,4 +13,8 @@ target_include_directories(
sync sync
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
\ No newline at end of file
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
#ifdef __cplusplus #ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_ENV_H
#define _TD_LIBS_SYNC_ENV_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
#include "trpc.h"
typedef struct SSyncEnv {
void *pTimer;
void *pTimerManager;
} SSyncEnv;
int32_t syncEnvStart();
int32_t syncEnvStop();
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_ENV_H*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_IO_H
#define _TD_LIBS_IO_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "syncInt.h"
#include "taosdef.h"
#include "tqueue.h"
#include "trpc.h"
typedef struct SSyncIO {
void * serverRpc;
void * clientRpc;
STaosQueue *pMsgQ;
STaosQset * pQset;
pthread_t tid;
int8_t isStart;
SEpSet epSet;
void *syncTimer;
void *syncTimerManager;
int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths);
int32_t (*ping)(struct SSyncIO *ths);
int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t (*destroy)(struct SSyncIO *ths);
void *pSyncNode;
int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg);
} SSyncIO;
extern SSyncIO *gSyncIO;
int32_t syncIOStart();
int32_t syncIOStop();
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg);
SSyncIO *syncIOCreate();
static int32_t doSyncIOStart(SSyncIO *io);
static int32_t doSyncIOStop(SSyncIO *io);
static int32_t doSyncIOPing(SSyncIO *io);
static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t doSyncIODestroy(SSyncIO *io);
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_IO_H*/
...@@ -23,7 +23,11 @@ extern "C" { ...@@ -23,7 +23,11 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h"
#include "taosdef.h" #include "taosdef.h"
#include "tlog.h"
extern int32_t sDebugFlag;
#define sFatal(...) \ #define sFatal(...) \
{ \ { \
...@@ -62,16 +66,81 @@ extern "C" { ...@@ -62,16 +66,81 @@ extern "C" {
} \ } \
} }
struct SRaft;
typedef struct SRaft SRaft;
struct SyncPing;
typedef struct SyncPing SyncPing;
struct SyncPingReply;
typedef struct SyncPingReply SyncPingReply;
struct SyncRequestVote;
typedef struct SyncRequestVote SyncRequestVote;
struct SyncRequestVoteReply;
typedef struct SyncRequestVoteReply SyncRequestVoteReply;
struct SyncAppendEntries;
typedef struct SyncAppendEntries SyncAppendEntries;
struct SyncAppendEntriesReply;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
typedef struct SSyncNode { typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN]; int8_t replica;
int8_t replica; int8_t quorum;
int8_t quorum;
int8_t selfIndex; SyncGroupId vgId;
uint32_t vgId; SSyncCfg syncCfg;
int32_t refCount; char path[TSDB_FILENAME_LEN];
int64_t rid;
SRaft* pRaft;
int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg);
int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg);
int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncNode; } SSyncNode;
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode);
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg);
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg);
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg);
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -41,26 +41,26 @@ typedef enum ESyncMessageType { ...@@ -41,26 +41,26 @@ typedef enum ESyncMessageType {
typedef struct SyncPing { typedef struct SyncPing {
ESyncMessageType msgType; ESyncMessageType msgType;
const SSyncBuffer *pData; const SSyncBuffer *pData;
} SyncPing; } SyncPing, RaftPing;
typedef struct SyncPingReply { typedef struct SyncPingReply {
ESyncMessageType msgType; ESyncMessageType msgType;
const SSyncBuffer *pData; const SSyncBuffer *pData;
} SyncPingReply; } SyncPingReply, RaftPingReply;
typedef struct SyncClientRequest { typedef struct SyncClientRequest {
ESyncMessageType msgType; ESyncMessageType msgType;
const SSyncBuffer *pData; const SSyncBuffer *pData;
int64_t seqNum; int64_t seqNum;
bool isWeak; bool isWeak;
} SyncClientRequest; } SyncClientRequest, RaftClientRequest;
typedef struct SyncClientRequestReply { typedef struct SyncClientRequestReply {
ESyncMessageType msgType; ESyncMessageType msgType;
int32_t errCode; int32_t errCode;
const SSyncBuffer *pErrMsg; const SSyncBuffer *pErrMsg;
const SSyncBuffer *pLeaderHint; const SSyncBuffer *pLeaderHint;
} SyncClientRequestReply; } SyncClientRequestReply, RaftClientRequestReply;
typedef struct SyncRequestVote { typedef struct SyncRequestVote {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -69,7 +69,7 @@ typedef struct SyncRequestVote { ...@@ -69,7 +69,7 @@ typedef struct SyncRequestVote {
SyncGroupId vgId; SyncGroupId vgId;
SyncIndex lastLogIndex; SyncIndex lastLogIndex;
SyncTerm lastLogTerm; SyncTerm lastLogTerm;
} SyncRequestVote; } SyncRequestVote, RaftRequestVote;
typedef struct SyncRequestVoteReply { typedef struct SyncRequestVoteReply {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -77,7 +77,7 @@ typedef struct SyncRequestVoteReply { ...@@ -77,7 +77,7 @@ typedef struct SyncRequestVoteReply {
SyncNodeId nodeId; SyncNodeId nodeId;
SyncGroupId vgId; SyncGroupId vgId;
bool voteGranted; bool voteGranted;
} SyncRequestVoteReply; } SyncRequestVoteReply, RaftRequestVoteReply;
typedef struct SyncAppendEntries { typedef struct SyncAppendEntries {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -88,7 +88,7 @@ typedef struct SyncAppendEntries { ...@@ -88,7 +88,7 @@ typedef struct SyncAppendEntries {
int32_t entryCount; int32_t entryCount;
SSyncRaftEntry * logEntries; SSyncRaftEntry * logEntries;
SyncIndex commitIndex; SyncIndex commitIndex;
} SyncAppendEntries; } SyncAppendEntries, RaftAppendEntries;
typedef struct SyncAppendEntriesReply { typedef struct SyncAppendEntriesReply {
ESyncMessageType msgType; ESyncMessageType msgType;
...@@ -96,7 +96,7 @@ typedef struct SyncAppendEntriesReply { ...@@ -96,7 +96,7 @@ typedef struct SyncAppendEntriesReply {
SyncNodeId nodeId; SyncNodeId nodeId;
bool success; bool success;
SyncIndex matchIndex; SyncIndex matchIndex;
} SyncAppendEntriesReply; } SyncAppendEntriesReply, RaftAppendEntriesReply;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,14 +28,56 @@ extern "C" { ...@@ -28,14 +28,56 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId nodeId; SyncNodeId addr;
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
typedef struct SRaft { typedef struct SRaft {
SRaftId id; SRaftId id;
SSyncFSM* pFsm;
int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg);
int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg);
int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg);
int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg);
int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg);
int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg);
int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg);
} SRaft; } SRaft;
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm);
void raftClose(SRaft* pRaft);
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg);
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg);
static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg);
static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg);
static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg);
static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg);
static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg);
static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg);
static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg);
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_LIBS_TPL_H #ifndef _TD_LIBS_SYNC_RAFT_ENTRY_H
#define _TD_LIBS_TPL_H #define _TD_LIBS_SYNC_RAFT_ENTRY_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
typedef struct SSyncRaftEntry { typedef struct SSyncRaftEntry {
...@@ -37,4 +37,4 @@ typedef struct SSyncRaftEntry { ...@@ -37,4 +37,4 @@ typedef struct SSyncRaftEntry {
} }
#endif #endif
#endif /*_TD_LIBS_TPL_H*/ #endif /*_TD_LIBS_SYNC_RAFT_ENTRY_H*/
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);
......
...@@ -23,17 +23,36 @@ extern "C" { ...@@ -23,17 +23,36 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "cJSON.h"
#include "syncInt.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
int32_t currentTerm(SyncTerm *pCurrentTerm); #define RAFT_STORE_BLOCK_SIZE 512
#define RAFT_STORE_PATH_LEN 128
int32_t persistCurrentTerm(SyncTerm currentTerm); typedef struct SRaftStore {
SyncTerm currentTerm;
SRaftId voteFor;
//FileFd fd;
char path[RAFT_STORE_PATH_LEN];
} SRaftStore;
int32_t voteFor(SRaftId *pRaftId); SRaftStore *raftStoreOpen(const char *path);
int32_t persistVoteFor(SRaftId *pRaftId); static int32_t raftStoreInit(SRaftStore *pRaftStore);
int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
static bool raftStoreFileExist(char *path);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_VOTG_MGR_H
#define _TD_LIBS_SYNC_VOTG_MGR_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_VOTG_MGR_H*/
...@@ -14,104 +14,98 @@ ...@@ -14,104 +14,98 @@
*/ */
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "sync.h"
void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// TLA+ Spec
// TLA+ Spec // AppendEntries(i, j) ==
//AppendEntries(i, j) == // /\ i /= j
// /\ i /= j // /\ state[i] = Leader
// /\ state[i] = Leader // /\ LET prevLogIndex == nextIndex[i][j] - 1
// /\ LET prevLogIndex == nextIndex[i][j] - 1 // prevLogTerm == IF prevLogIndex > 0 THEN
// prevLogTerm == IF prevLogIndex > 0 THEN // log[i][prevLogIndex].term
// log[i][prevLogIndex].term // ELSE
// ELSE // 0
// 0 // \* Send up to 1 entry, constrained by the end of the log.
// \* Send up to 1 entry, constrained by the end of the log. // lastEntry == Min({Len(log[i]), nextIndex[i][j]})
// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) // entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) // IN Send([mtype |-> AppendEntriesRequest,
// IN Send([mtype |-> AppendEntriesRequest, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mprevLogIndex |-> prevLogIndex,
// mprevLogIndex |-> prevLogIndex, // mprevLogTerm |-> prevLogTerm,
// mprevLogTerm |-> prevLogTerm, // mentries |-> entries,
// mentries |-> entries, // \* mlog is used as a history variable for the proof.
// \* mlog is used as a history variable for the proof. // \* It would not exist in a real implementation.
// \* It would not exist in a real implementation. // mlog |-> log[i],
// mlog |-> log[i], // mcommitIndex |-> Min({commitIndex[i], lastEntry}),
// mcommitIndex |-> Min({commitIndex[i], lastEntry}), // msource |-> i,
// msource |-> i, // mdest |-> j])
// mdest |-> j]) // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleAppendEntriesRequest(i, j, m) ==
//HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0
// LET logOk == \/ m.mprevLogIndex = 0 // \/ /\ m.mprevLogIndex > 0
// \/ /\ m.mprevLogIndex > 0 // /\ m.mprevLogIndex <= Len(log[i])
// /\ m.mprevLogIndex <= Len(log[i]) // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term // IN /\ m.mterm <= currentTerm[i]
// IN /\ m.mterm <= currentTerm[i] // /\ \/ /\ \* reject request
// /\ \/ /\ \* reject request // \/ m.mterm < currentTerm[i]
// \/ m.mterm < currentTerm[i] // \/ /\ m.mterm = currentTerm[i]
// \/ /\ m.mterm = currentTerm[i] // /\ state[i] = Follower
// /\ state[i] = Follower // /\ \lnot logOk
// /\ \lnot logOk // /\ Reply([mtype |-> AppendEntriesResponse,
// /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // msuccess |-> FALSE,
// msuccess |-> FALSE, // mmatchIndex |-> 0,
// mmatchIndex |-> 0, // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<serverVars, logVars>>
// /\ UNCHANGED <<serverVars, logVars>> // \/ \* return to follower state
// \/ \* return to follower state // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ state[i] = Candidate
// /\ state[i] = Candidate // /\ state' = [state EXCEPT ![i] = Follower]
// /\ state' = [state EXCEPT ![i] = Follower] // /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
// /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>> // \/ \* accept request
// \/ \* accept request // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ state[i] = Follower
// /\ state[i] = Follower // /\ logOk
// /\ logOk // /\ LET index == m.mprevLogIndex + 1
// /\ LET index == m.mprevLogIndex + 1 // IN \/ \* already done with request
// IN \/ \* already done with request // /\ \/ m.mentries = << >>
// /\ \/ m.mentries = << >> // \/ /\ m.mentries /= << >>
// \/ /\ m.mentries /= << >> // /\ Len(log[i]) >= index
// /\ Len(log[i]) >= index // /\ log[i][index].term = m.mentries[1].term
// /\ log[i][index].term = m.mentries[1].term // \* This could make our commitIndex decrease (for
// \* This could make our commitIndex decrease (for // \* example if we process an old, duplicated request),
// \* example if we process an old, duplicated request), // \* but that doesn't really affect anything.
// \* but that doesn't really affect anything. // /\ commitIndex' = [commitIndex EXCEPT ![i] =
// /\ commitIndex' = [commitIndex EXCEPT ![i] = // m.mcommitIndex]
// m.mcommitIndex] // /\ Reply([mtype |-> AppendEntriesResponse,
// /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // msuccess |-> TRUE,
// msuccess |-> TRUE, // mmatchIndex |-> m.mprevLogIndex +
// mmatchIndex |-> m.mprevLogIndex + // Len(m.mentries),
// Len(m.mentries), // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<serverVars, log>>
// /\ UNCHANGED <<serverVars, log>> // \/ \* conflict: remove 1 entry
// \/ \* conflict: remove 1 entry // /\ m.mentries /= << >>
// /\ m.mentries /= << >> // /\ Len(log[i]) >= index
// /\ Len(log[i]) >= index // /\ log[i][index].term /= m.mentries[1].term
// /\ log[i][index].term /= m.mentries[1].term // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> // log[i][index2]]
// log[i][index2]] // IN log' = [log EXCEPT ![i] = new]
// IN log' = [log EXCEPT ![i] = new] // /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<serverVars, commitIndex, messages>> // \/ \* no conflict: append entry
// \/ \* no conflict: append entry // /\ m.mentries /= << >>
// /\ m.mentries /= << >> // /\ Len(log[i]) = m.mprevLogIndex
// /\ Len(log[i]) = m.mprevLogIndex // /\ log' = [log EXCEPT ![i] =
// /\ log' = [log EXCEPT ![i] = // Append(log[i], m.mentries[1])]
// Append(log[i], m.mentries[1])] // /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<serverVars, commitIndex, messages>> // /\ UNCHANGED <<candidateVars, leaderVars>>
// /\ UNCHANGED <<candidateVars, leaderVars>> //
//
} }
...@@ -14,21 +14,18 @@ ...@@ -14,21 +14,18 @@
*/ */
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
#include "sync.h"
void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleAppendEntriesResponse(i, j, m) ==
//HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ \/ /\ m.msuccess \* successful
// /\ \/ /\ m.msuccess \* successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // \/ /\ \lnot m.msuccess \* not successful
// \/ /\ \lnot m.msuccess \* not successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = // Max({nextIndex[i][j] - 1, 1})]
// Max({nextIndex[i][j] - 1, 1})] // /\ UNCHANGED <<matchIndex>>
// /\ UNCHANGED <<matchIndex>> // /\ Discard(m)
// /\ Discard(m) // /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
} }
...@@ -13,4 +13,4 @@ ...@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncElection.h"
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncEnv.h"
#include <assert.h>
SSyncEnv *gSyncEnv = NULL;
int32_t syncEnvStart() {
int32_t ret;
gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
assert(gSyncEnv != NULL);
ret = doSyncEnvStart(gSyncEnv);
return ret;
}
int32_t syncEnvStop() {
int32_t ret = doSyncEnvStop(gSyncEnv);
return ret;
}
static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; }
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncIO.h"
#include <tep.h>
#include "syncOnMessage.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tutil.h"
SSyncIO *gSyncIO = NULL;
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; }
int32_t syncIOStart() { return 0; }
int32_t syncIOStop() { return 0; }
static void syncTick(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sDebug("syncTick ... ");
SRpcMsg rpcMsg;
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "TICK");
rpcMsg.contLen = 10;
rpcMsg.handle = NULL;
rpcMsg.msgType = 2;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager);
}
void *syncConsumer(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sDebug("%d sync-io msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont));
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) {
int msgSize = 128;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
}
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
return NULL;
}
static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
return ret;
}
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sDebug("processResponse ... ");
rpcFreeCont(pMsg->pCont);
}
static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(io->pMsgQ, pTemp);
}
SSyncIO *syncIOCreate() {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->start = doSyncIOStart;
io->stop = doSyncIOStop;
io->ping = doSyncIOPing;
io->onMsg = doSyncIOOnMsg;
io->destroy = doSyncIODestroy;
return io;
}
static int32_t doSyncIOStart(SSyncIO *io) {
taosBlockSIGPIPE();
tsRpcForceTcp = 1;
// cient rpc init
{
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "SYNC-IO-CLIENT";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "sync-io";
rpcInit.secret = "sync-io";
rpcInit.ckey = "key";
rpcInit.spi = 0;
rpcInit.connType = TAOS_CONN_CLIENT;
io->clientRpc = rpcOpen(&rpcInit);
if (io->clientRpc == NULL) {
sError("failed to initialize RPC");
return -1;
}
}
// server rpc init
{
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 38000;
rpcInit.label = "SYNC-IO-SERVER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo;
rpcInit.parent = io;
rpcInit.connType = TAOS_CONN_SERVER;
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
sError("failed to start RPC server");
return -1;
}
}
io->epSet.inUse = 0;
addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000);
// start consumer thread
{
if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) {
sError("failed to create sync consumer thread since %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
// start tmr thread
io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager);
return 0;
}
static int32_t doSyncIOStop(SSyncIO *io) {
atomic_store_8(&io->isStart, 0);
pthread_join(io->tid, NULL);
return 0;
}
static int32_t doSyncIOPing(SSyncIO *io) {
SRpcMsg rpcMsg, rspMsg;
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "ping");
rpcMsg.contLen = 10;
rpcMsg.handle = NULL;
rpcMsg.msgType = 1;
rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL);
return 0;
}
static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }
static int32_t doSyncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart);
assert(start == 0);
if (io->serverRpc != NULL) {
free(io->serverRpc);
io->serverRpc = NULL;
}
if (io->clientRpc != NULL) {
free(io->clientRpc);
io->clientRpc = NULL;
}
if (io->pMsgQ != NULL) {
free(io->pMsgQ);
io->pMsgQ = NULL;
}
if (io->pQset != NULL) {
free(io->pQset);
io->pQset = NULL;
}
return 0;
}
...@@ -16,12 +16,17 @@ ...@@ -16,12 +16,17 @@
#include <stdint.h> #include <stdint.h>
#include "sync.h" #include "sync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h"
int32_t syncInit() { return 0; } int32_t syncInit() { return 0; }
void syncCleanUp() {} void syncCleanUp() {}
int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } int64_t syncStart(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL);
return 0;
}
void syncStop(int64_t rid) {} void syncStop(int64_t rid) {}
...@@ -31,4 +36,74 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r ...@@ -31,4 +36,74 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
\ No newline at end of file
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
assert(pSyncNode != NULL);
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpPing = doSyncNodePing;
pSyncNode->FpOnPing = onSyncNodePing;
pSyncNode->FpOnPingReply = onSyncNodePingReply;
pSyncNode->FpRequestVote = doSyncNodeRequestVote;
pSyncNode->FpOnRequestVote = onSyncNodeRequestVote;
pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply;
pSyncNode->FpAppendEntries = doSyncNodeAppendEntries;
pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries;
pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply;
return pSyncNode;
}
void syncNodeClose(SSyncNode* pSyncNode) {
assert(pSyncNode != NULL);
raftClose(pSyncNode->pRaft);
free(pSyncNode);
}
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) {
int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg);
return ret;
}
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg);
return ret;
}
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg);
return ret;
}
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg);
return ret;
}
\ No newline at end of file
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "syncMessage.h" #include "syncMessage.h"
#include "sync.h"
#include "syncRaft.h" #include "syncRaft.h"
void onMessage(SRaft *pRaft, void *pMsg) {} void onMessage(SRaft *pRaft, void *pMsg) {}
\ No newline at end of file
...@@ -13,4 +13,4 @@ ...@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncOnMessage.h"
...@@ -16,6 +16,51 @@ ...@@ -16,6 +16,51 @@
#include "syncRaft.h" #include "syncRaft.h"
#include "sync.h" #include "sync.h"
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL);
pRaft->id = raftId;
pRaft->pFsm = pFsm;
pRaft->FpPing = doRaftPing;
pRaft->FpOnPing = onRaftPing;
pRaft->FpOnPingReply = onRaftPingReply;
pRaft->FpRequestVote = doRaftRequestVote;
pRaft->FpOnRequestVote = onRaftRequestVote;
pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply;
pRaft->FpAppendEntries = doRaftAppendEntries;
pRaft->FpOnAppendEntries = onRaftAppendEntries;
pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply;
return pRaft;
}
void raftClose(SRaft* pRaft) {
assert(pRaft != NULL);
free(pRaft);
}
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; }
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; }
static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; }
static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; }
static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; }
static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; }
static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; }
static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; }
static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; }
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
...@@ -13,4 +13,4 @@ ...@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncRaftEntry.h"
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "sync.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; }
......
...@@ -14,12 +14,142 @@ ...@@ -14,12 +14,142 @@
*/ */
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "sync.h" #include "cJSON.h"
int32_t currentTerm(SyncTerm *pCurrentTerm) { return 0; } // to complie success: FileIO interface is modified
int32_t persistCurrentTerm(SyncTerm currentTerm) { return 0; } SRaftStore *raftStoreOpen(const char *path) { return NULL;}
int32_t voteFor(SRaftId *pRaftId) { return 0; } static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;}
int32_t persistVoteFor(SRaftId *pRaftId) { return 0; } int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;}
\ No newline at end of file
int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;}
static bool raftStoreFileExist(char *path) { return 0;}
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;}
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;}
void raftStorePrint(SRaftStore *pRaftStore) {}
#if 0
SRaftStore *raftStoreOpen(const char *path) {
int32_t ret;
SRaftStore *pRaftStore = malloc(sizeof(SRaftStore));
if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error");
return NULL;
}
memset(pRaftStore, 0, sizeof(*pRaftStore));
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
char storeBuf[RAFT_STORE_BLOCK_SIZE];
memset(storeBuf, 0, sizeof(storeBuf));
if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore);
assert(ret == 0);
}
pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path);
if (pRaftStore->fd < 0) {
return NULL;
}
int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf));
assert(len == RAFT_STORE_BLOCK_SIZE);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
assert(ret == 0);
return pRaftStore;
}
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path);
if (pRaftStore->fd < 0) {
return -1;
}
pRaftStore->currentTerm = 0;
pRaftStore->voteFor.addr = 0;
pRaftStore->voteFor.vgId = 0;
int32_t ret = raftStorePersist(pRaftStore);
assert(ret == 0);
taosCloseFile(pRaftStore->fd);
return 0;
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
taosCloseFile(pRaftStore->fd);
free(pRaftStore);
return 0;
}
int32_t raftStorePersist(SRaftStore *pRaftStore) {
int32_t ret;
char storeBuf[RAFT_STORE_BLOCK_SIZE];
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
assert(ret == 0);
taosLSeekFile(pRaftStore->fd, 0, SEEK_SET);
ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf));
assert(ret == RAFT_STORE_BLOCK_SIZE);
fsync(pRaftStore->fd);
return 0;
}
static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized);
assert(len2 < len);
memset(buf, 0, len);
snprintf(buf, len, "%s", serialized);
free(serialized);
cJSON_Delete(pRoot);
return 0;
}
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
pRaftStore->currentTerm = pCurrentTerm->valueint;
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
pRaftStore->voteFor.addr = pVoteForAddr->valueint;
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
cJSON_Delete(pRoot);
return 0;
}
void raftStorePrint(SRaftStore *pRaftStore) {
char storeBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf);
}
#endif
...@@ -13,4 +13,4 @@ ...@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncReplication.h"
...@@ -14,46 +14,41 @@ ...@@ -14,46 +14,41 @@
*/ */
#include "syncRequestVote.h" #include "syncRequestVote.h"
#include "sync.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// TLA+ Spec
// TLA+ Spec // RequestVote(i, j) ==
//RequestVote(i, j) == // /\ state[i] = Candidate
// /\ state[i] = Candidate // /\ j \notin votesResponded[i]
// /\ j \notin votesResponded[i] // /\ Send([mtype |-> RequestVoteRequest,
// /\ Send([mtype |-> RequestVoteRequest, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mlastLogTerm |-> LastTerm(log[i]),
// mlastLogTerm |-> LastTerm(log[i]), // mlastLogIndex |-> Len(log[i]),
// mlastLogIndex |-> Len(log[i]), // msource |-> i,
// msource |-> i, // mdest |-> j])
// mdest |-> j]) // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleRequestVoteRequest(i, j, m) ==
//HandleRequestVoteRequest(i, j, m) == // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) // \/ /\ m.mlastLogTerm = LastTerm(log[i])
// \/ /\ m.mlastLogTerm = LastTerm(log[i]) // /\ m.mlastLogIndex >= Len(log[i])
// /\ m.mlastLogIndex >= Len(log[i]) // grant == /\ m.mterm = currentTerm[i]
// grant == /\ m.mterm = currentTerm[i] // /\ logOk
// /\ logOk // /\ votedFor[i] \in {Nil, j}
// /\ votedFor[i] \in {Nil, j} // IN /\ m.mterm <= currentTerm[i]
// IN /\ m.mterm <= currentTerm[i] // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j]
// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] // \/ ~grant /\ UNCHANGED votedFor
// \/ ~grant /\ UNCHANGED votedFor // /\ Reply([mtype |-> RequestVoteResponse,
// /\ Reply([mtype |-> RequestVoteResponse, // mterm |-> currentTerm[i],
// mterm |-> currentTerm[i], // mvoteGranted |-> grant,
// mvoteGranted |-> grant, // \* mlog is used just for the `elections' history variable for
// \* mlog is used just for the `elections' history variable for // \* the proof. It would not exist in a real implementation.
// \* the proof. It would not exist in a real implementation. // mlog |-> log[i],
// mlog |-> log[i], // msource |-> i,
// msource |-> i, // mdest |-> j],
// mdest |-> j], // m)
// m) // /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
} }
...@@ -14,25 +14,22 @@ ...@@ -14,25 +14,22 @@
*/ */
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
#include "sync.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) {
// TLA+ Spec
// TLA+ Spec // HandleRequestVoteResponse(i, j, m) ==
//HandleRequestVoteResponse(i, j, m) == // \* This tallies votes even when the current state is not Candidate, but
// \* This tallies votes even when the current state is not Candidate, but // \* they won't be looked at, so it doesn't matter.
// \* they won't be looked at, so it doesn't matter. // /\ m.mterm = currentTerm[i]
// /\ m.mterm = currentTerm[i] // /\ votesResponded' = [votesResponded EXCEPT ![i] =
// /\ votesResponded' = [votesResponded EXCEPT ![i] = // votesResponded[i] \cup {j}]
// votesResponded[i] \cup {j}] // /\ \/ /\ m.mvoteGranted
// /\ \/ /\ m.mvoteGranted // /\ votesGranted' = [votesGranted EXCEPT ![i] =
// /\ votesGranted' = [votesGranted EXCEPT ![i] = // votesGranted[i] \cup {j}]
// votesGranted[i] \cup {j}] // /\ voterLog' = [voterLog EXCEPT ![i] =
// /\ voterLog' = [voterLog EXCEPT ![i] = // voterLog[i] @@ (j :> m.mlog)]
// voterLog[i] @@ (j :> m.mlog)] // \/ /\ ~m.mvoteGranted
// \/ /\ ~m.mvoteGranted // /\ UNCHANGED <<votesGranted, voterLog>>
// /\ UNCHANGED <<votesGranted, voterLog>> // /\ Discard(m)
// /\ Discard(m) // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
} }
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "syncSnapshot.h" #include "syncSnapshot.h"
#include "sync.h"
#include "syncRaft.h" #include "syncRaft.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; }
......
...@@ -14,6 +14,5 @@ ...@@ -14,6 +14,5 @@
*/ */
#include "syncTimeout.h" #include "syncTimeout.h"
#include "sync.h"
void onTimeout(SRaft *pRaft, void *pMsg) {} void onTimeout(SRaft *pRaft, void *pMsg) {}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncVoteMgr.h"
add_executable(syncTest "")
add_executable(syncEnvTest "")
add_executable(syncPingTest "")
target_sources(syncTest
PRIVATE
"syncTest.cpp"
)
target_sources(syncEnvTest
PRIVATE
"syncEnvTest.cpp"
)
target_sources(syncPingTest
PRIVATE
"syncPingTest.cpp"
)
target_include_directories(syncTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEnvTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
sync
gtest_main
)
target_link_libraries(syncEnvTest
sync
gtest_main
)
target_link_libraries(syncPingTest
sync
gtest_main
)
enable_testing()
add_test(
NAME sync_test
COMMAND syncTest
)
#include "syncEnv.h"
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
void doSync() {
SSyncInfo syncInfo;
syncInfo.vgId = 1;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = 3;
pCfg->nodeInfo[0].nodePort = 7010;
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = 7110;
taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = 7210;
taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
}
int main() {
//taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret = syncIOStart();
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
doSync();
while (1) {
taosMsleep(1000);
}
return 0;
}
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
void doSync() {
SSyncFSM* pFsm;
SSyncInfo syncInfo;
syncInfo.vgId = 1;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = 0;
pCfg->replicaNum = 3;
pCfg->nodeInfo[0].nodePort = 7010;
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = 7110;
taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = 7210;
taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
}
int main() {
//taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int32_t ret = syncIOStart();
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
doSync();
while (1) {
taosMsleep(1000);
}
return 0;
}
#include <stdio.h> #include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
int main() { void *pingFunc(void *param) {
printf("test \n"); SSyncIO *io = (SSyncIO *)param;
return 0; while (1) {
sDebug("io->ping");
io->ping(io);
sleep(1);
}
return NULL;
} }
int main() {
//taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
sTrace("sync log test: trace");
sDebug("sync log test: debug");
sInfo("sync log test: info");
sWarn("sync log test: warn");
sError("sync log test: error");
sFatal("sync log test: fatal");
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore);
pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300;
raftStorePrint(pRaftStore);
raftStorePersist(pRaftStore);
sDebug("sync test");
SSyncIO *syncIO = syncIOCreate();
assert(syncIO != NULL);
syncIO->start(syncIO);
sleep(2);
pthread_t tid;
pthread_create(&tid, NULL, pingFunc, syncIO);
while (1) {
sleep(1);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册