Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e8780ceb
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e8780ceb
编写于
2月 26, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add sync code
上级
b6399528
变更
40
隐藏空白更改
内联
并排
Showing
40 changed file
with
127 addition
and
858 deletion
+127
-858
source/libs/sync/CMakeLists.txt
source/libs/sync/CMakeLists.txt
+1
-5
source/libs/sync/inc/syncAppendEntries.h
source/libs/sync/inc/syncAppendEntries.h
+0
-1
source/libs/sync/inc/syncAppendEntriesReply.h
source/libs/sync/inc/syncAppendEntriesReply.h
+0
-1
source/libs/sync/inc/syncElection.h
source/libs/sync/inc/syncElection.h
+0
-1
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+0
-47
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+0
-68
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+0
-10
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+11
-9
source/libs/sync/inc/syncRaft.h
source/libs/sync/inc/syncRaft.h
+40
-5
source/libs/sync/inc/syncRaftEntry.h
source/libs/sync/inc/syncRaftEntry.h
+4
-4
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+1
-1
source/libs/sync/inc/syncRaftStore.h
source/libs/sync/inc/syncRaftStore.h
+5
-24
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+0
-1
source/libs/sync/inc/syncRequestVote.h
source/libs/sync/inc/syncRequestVote.h
+0
-1
source/libs/sync/inc/syncRequestVoteReply.h
source/libs/sync/inc/syncRequestVoteReply.h
+0
-1
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+1
-1
source/libs/sync/inc/syncTimeout.h
source/libs/sync/inc/syncTimeout.h
+0
-1
source/libs/sync/inc/syncVoteMgr.h
source/libs/sync/inc/syncVoteMgr.h
+0
-33
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+1
-0
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+1
-0
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+1
-1
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+0
-36
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+0
-245
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+3
-11
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+1
-0
source/libs/sync/src/syncOnMessage.c
source/libs/sync/src/syncOnMessage.c
+1
-1
source/libs/sync/src/syncRaft.c
source/libs/sync/src/syncRaft.c
+41
-0
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+1
-1
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+1
-0
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+5
-111
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+1
-1
source/libs/sync/src/syncRequestVote.c
source/libs/sync/src/syncRequestVote.c
+1
-0
source/libs/sync/src/syncRequestVoteReply.c
source/libs/sync/src/syncRequestVoteReply.c
+1
-0
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+1
-0
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+1
-0
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+0
-16
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+0
-55
source/libs/sync/test/syncEnvTest.cpp
source/libs/sync/test/syncEnvTest.cpp
+0
-56
source/libs/sync/test/syncPingTest.cpp
source/libs/sync/test/syncPingTest.cpp
+0
-57
source/libs/sync/test/syncTest.cpp
source/libs/sync/test/syncTest.cpp
+3
-53
未找到文件。
source/libs/sync/CMakeLists.txt
浏览文件 @
e8780ceb
...
...
@@ -13,8 +13,4 @@ target_include_directories(
sync
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
)
\ No newline at end of file
source/libs/sync/inc/syncAppendEntries.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
...
...
source/libs/sync/inc/syncAppendEntriesReply.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
...
...
source/libs/sync/inc/syncElection.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
#ifdef __cplusplus
...
...
source/libs/sync/inc/syncEnv.h
已删除
100644 → 0
浏览文件 @
b6399528
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_ENV_H
#define _TD_LIBS_SYNC_ENV_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
#include "trpc.h"
typedef
struct
SSyncEnv
{
void
*
pTimer
;
void
*
pTimerManager
;
}
SSyncEnv
;
int32_t
syncEnvStart
();
int32_t
syncEnvStop
();
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_LIBS_SYNC_ENV_H*/
source/libs/sync/inc/syncIO.h
已删除
100644 → 0
浏览文件 @
b6399528
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_IO_H
#define _TD_LIBS_IO_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "syncInt.h"
#include "taosdef.h"
#include "tqueue.h"
#include "trpc.h"
typedef
struct
SSyncIO
{
void
*
serverRpc
;
void
*
clientRpc
;
STaosQueue
*
pMsgQ
;
STaosQset
*
pQset
;
pthread_t
tid
;
int8_t
isStart
;
SEpSet
epSet
;
void
*
syncTimer
;
void
*
syncTimerManager
;
int32_t
(
*
start
)(
struct
SSyncIO
*
ths
);
int32_t
(
*
stop
)(
struct
SSyncIO
*
ths
);
int32_t
(
*
ping
)(
struct
SSyncIO
*
ths
);
int32_t
(
*
onMessage
)(
struct
SSyncIO
*
ths
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
(
*
destroy
)(
struct
SSyncIO
*
ths
);
}
SSyncIO
;
int32_t
syncIOStart
();
int32_t
syncIOStop
();
SSyncIO
*
syncIOCreate
();
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
);
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
);
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
);
static
int32_t
doSyncIOOnMessage
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_LIBS_IO_H*/
source/libs/sync/inc/syncInt.h
浏览文件 @
e8780ceb
...
...
@@ -23,11 +23,7 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync.h"
#include "taosdef.h"
#include "tlog.h"
extern
int32_t
sDebugFlag
;
#define sFatal(...) \
{ \
...
...
@@ -76,12 +72,6 @@ typedef struct SSyncNode {
int64_t
rid
;
}
SSyncNode
;
SSyncNode
*
syncNodeStart
(
const
SSyncInfo
*
pSyncInfo
);
void
syncNodeStop
(
SSyncNode
*
pSyncNode
);
// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t
syncNodeForwardToPeer
(
SSyncNode
*
pSyncNode
,
const
SSyncBuffer
*
pBuf
,
bool
isWeak
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/inc/syncMessage.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync
Int
.h"
#include "sync.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
...
...
@@ -41,26 +41,28 @@ typedef enum ESyncMessageType {
typedef
struct
SyncPing
{
ESyncMessageType
msgType
;
const
SSyncBuffer
*
pData
;
}
SyncPing
;
}
SyncPing
,
RaftPing
;
typedef
struct
SyncPingReply
{
ESyncMessageType
msgType
;
const
SSyncBuffer
*
pData
;
}
SyncPingReply
;
}
SyncPingReply
,
RaftPingReply
;
typedef
struct
SyncClientRequest
{
ESyncMessageType
msgType
;
const
SSyncBuffer
*
pData
;
int64_t
seqNum
;
bool
isWeak
;
}
SyncClientRequest
;
}
SyncClientRequest
,
RaftClientRequest
;
typedef
struct
SyncClientRequestReply
{
ESyncMessageType
msgType
;
int32_t
errCode
;
const
SSyncBuffer
*
pErrMsg
;
const
SSyncBuffer
*
pLeaderHint
;
}
SyncClientRequestReply
;
}
SyncClientRequestReply
,
RaftClientRequestReply
;
typedef
struct
SyncRequestVote
{
ESyncMessageType
msgType
;
...
...
@@ -69,7 +71,7 @@ typedef struct SyncRequestVote {
SyncGroupId
vgId
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
}
SyncRequestVote
;
}
SyncRequestVote
,
RaftRequestVote
;
typedef
struct
SyncRequestVoteReply
{
ESyncMessageType
msgType
;
...
...
@@ -77,7 +79,7 @@ typedef struct SyncRequestVoteReply {
SyncNodeId
nodeId
;
SyncGroupId
vgId
;
bool
voteGranted
;
}
SyncRequestVoteReply
;
}
SyncRequestVoteReply
,
RaftRequestVoteReply
;
typedef
struct
SyncAppendEntries
{
ESyncMessageType
msgType
;
...
...
@@ -88,7 +90,7 @@ typedef struct SyncAppendEntries {
int32_t
entryCount
;
SSyncRaftEntry
*
logEntries
;
SyncIndex
commitIndex
;
}
SyncAppendEntries
;
}
SyncAppendEntries
,
RaftAppendEntries
;
typedef
struct
SyncAppendEntriesReply
{
ESyncMessageType
msgType
;
...
...
@@ -96,7 +98,7 @@ typedef struct SyncAppendEntriesReply {
SyncNodeId
nodeId
;
bool
success
;
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
;
}
SyncAppendEntriesReply
,
RaftAppendEntriesReply
;
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncRaft.h
浏览文件 @
e8780ceb
...
...
@@ -23,24 +23,59 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync
Int
.h"
#include "sync.h"
#include "syncMessage.h"
#include "taosdef.h"
typedef
struct
SRaftId
{
SyncNodeId
addr
;
SyncNodeId
nodeId
;
SyncGroupId
vgId
;
}
SRaftId
;
typedef
struct
SRaft
{
SRaftId
id
;
void
*
data
;
SSyncLogStore
*
logStore
;
SStateMgr
*
stateManager
;
SSyncFSM
*
syncFsm
;
int32_t
(
*
FpPing
)(
struct
SRaft
*
ths
,
const
RaftPing
*
pMsg
);
int32_t
(
*
FpOnPing
)(
struct
SRaft
*
ths
,
RaftPing
*
pMsg
);
int32_t
(
*
FpOnPingReply
)(
struct
SRaft
*
ths
,
RaftPingReply
*
pMsg
);
int32_t
(
*
FpRequestVote
)(
struct
SRaft
*
ths
,
const
RaftRequestVote
*
pMsg
);
int32_t
(
*
FpOnRequestVote
)(
struct
SRaft
*
ths
,
RaftRequestVote
*
pMsg
);
int32_t
(
*
FpOnRequestVoteReply
)(
struct
SRaft
*
ths
,
RaftRequestVoteReply
*
pMsg
);
int32_t
(
*
FpAppendEntries
)(
struct
SRaft
*
ths
,
const
RaftAppendEntries
*
pMsg
);
int32_t
(
*
FpOnAppendEntries
)(
struct
SRaft
*
ths
,
RaftAppendEntries
*
pMsg
);
int32_t
(
*
FpOnAppendEntriesReply
)(
struct
SRaft
*
ths
,
RaftAppendEntriesReply
*
pMsg
);
}
SRaft
;
SRaft
*
raftCreate
(
SRaftId
raftId
,
void
*
data
);
static
int32_t
doRaftPing
(
struct
SRaft
*
ths
,
const
RaftPing
*
pMsg
);
static
int32_t
onRaftPing
(
struct
SRaft
*
ths
,
RaftPing
*
pMsg
);
static
int32_t
onRaftPingReply
(
struct
SRaft
*
ths
,
RaftPingReply
*
pMsg
);
static
int32_t
doRaftRequestVote
(
struct
SRaft
*
ths
,
const
RaftRequestVote
*
pMsg
);
static
int32_t
onRaftRequestVote
(
struct
SRaft
*
ths
,
RaftRequestVote
*
pMsg
);
static
int32_t
onRaftRequestVoteReply
(
struct
SRaft
*
ths
,
RaftRequestVoteReply
*
pMsg
);
static
int32_t
doRaftAppendEntries
(
struct
SRaft
*
ths
,
const
RaftAppendEntries
*
pMsg
);
static
int32_t
onRaftAppendEntries
(
struct
SRaft
*
ths
,
RaftAppendEntries
*
pMsg
);
static
int32_t
onRaftAppendEntriesReply
(
struct
SRaft
*
ths
,
RaftAppendEntriesReply
*
pMsg
);
int32_t
raftPropose
(
SRaft
*
pRaft
,
const
SSyncBuffer
*
pBuf
,
bool
isWeak
);
static
int
raftSendMsg
(
SRaftId
destRaftId
,
const
void
*
pMsg
,
const
SRaft
*
pRaft
);
...
...
source/libs/sync/inc/syncRaftEntry.h
浏览文件 @
e8780ceb
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_
SYNC_RAFT_ENTRY
_H
#define _TD_LIBS_
SYNC_RAFT_ENTRY
_H
#ifndef _TD_LIBS_
TPL
_H
#define _TD_LIBS_
TPL
_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync
Int
.h"
#include "sync.h"
#include "taosdef.h"
typedef
struct
SSyncRaftEntry
{
...
...
@@ -37,4 +37,4 @@ typedef struct SSyncRaftEntry {
}
#endif
#endif
/*_TD_LIBS_
SYNC_RAFT_ENTRY
_H*/
#endif
/*_TD_LIBS_
TPL
_H*/
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync
Int
.h"
#include "sync.h"
#include "taosdef.h"
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncBuffer
*
pBuf
);
...
...
source/libs/sync/inc/syncRaftStore.h
浏览文件 @
e8780ceb
...
...
@@ -23,36 +23,17 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h"
#include "sync.h"
#include "syncRaft.h"
#include "taosdef.h"
#define RAFT_STORE_BLOCK_SIZE 512
#define RAFT_STORE_PATH_LEN 128
int32_t
currentTerm
(
SyncTerm
*
pCurrentTerm
);
typedef
struct
SRaftStore
{
SyncTerm
currentTerm
;
SRaftId
voteFor
;
FileFd
fd
;
char
path
[
RAFT_STORE_PATH_LEN
];
}
SRaftStore
;
int32_t
persistCurrentTerm
(
SyncTerm
currentTerm
);
SRaftStore
*
raftStoreOpen
(
const
char
*
path
);
int32_t
voteFor
(
SRaftId
*
pRaftId
);
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
);
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
);
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
);
static
bool
raftStoreFileExist
(
char
*
path
);
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
);
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
);
void
raftStorePrint
(
SRaftStore
*
pRaftStore
);
int32_t
persistVoteFor
(
SRaftId
*
pRaftId
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
#ifdef __cplusplus
...
...
source/libs/sync/inc/syncRequestVote.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
...
...
source/libs/sync/inc/syncRequestVoteReply.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync
Int
.h"
#include "sync.h"
#include "syncRaft.h"
#include "taosdef.h"
...
...
source/libs/sync/inc/syncTimeout.h
浏览文件 @
e8780ceb
...
...
@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
...
...
source/libs/sync/inc/syncVoteMgr.h
已删除
100644 → 0
浏览文件 @
b6399528
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_VOTG_MGR_H
#define _TD_LIBS_SYNC_VOTG_MGR_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
#ifdef __cplusplus
}
#endif
#endif
/*_TD_LIBS_SYNC_VOTG_MGR_H*/
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncAppendEntries.h"
#include "sync.h"
void
appendEntries
(
SRaft
*
pRaft
,
const
SyncAppendEntries
*
pMsg
)
{
// TLA+ Spec
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncAppendEntriesReply.h"
#include "sync.h"
void
onAppendEntriesReply
(
SRaft
*
pRaft
,
const
SyncAppendEntriesReply
*
pMsg
)
{
// TLA+ Spec
...
...
source/libs/sync/src/syncElection.c
浏览文件 @
e8780ceb
...
...
@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync
Election
.h"
#include "sync.h"
source/libs/sync/src/syncEnv.c
已删除
100644 → 0
浏览文件 @
b6399528
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncEnv.h"
#include <assert.h>
SSyncEnv
*
gSyncEnv
=
NULL
;
int32_t
syncEnvStart
()
{
int32_t
ret
;
gSyncEnv
=
(
SSyncEnv
*
)
malloc
(
sizeof
(
SSyncEnv
));
assert
(
gSyncEnv
!=
NULL
);
ret
=
doSyncEnvStart
(
gSyncEnv
);
return
ret
;
}
int32_t
syncEnvStop
()
{
int32_t
ret
=
doSyncEnvStop
(
gSyncEnv
);
return
ret
;
}
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
)
{
return
0
;
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
return
0
;
}
source/libs/sync/src/syncIO.c
已删除
100644 → 0
浏览文件 @
b6399528
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncIO.h"
#include <tep.h>
#include "syncOnMessage.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tutil.h"
int32_t
syncIOStart
()
{
return
0
;
}
int32_t
syncIOStop
()
{
return
0
;
}
static
void
syncTick
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sDebug
(
"syncTick ... "
);
SRpcMsg
rpcMsg
;
rpcMsg
.
pCont
=
rpcMallocCont
(
10
);
snprintf
(
rpcMsg
.
pCont
,
10
,
"TICK"
);
rpcMsg
.
contLen
=
10
;
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
2
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
&
rpcMsg
,
sizeof
(
SRpcMsg
));
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
io
->
syncTimer
=
taosTmrStart
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
);
}
void
*
syncConsumer
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
qall
=
taosAllocateQall
();
while
(
1
)
{
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
io
->
pQset
,
qall
,
NULL
,
NULL
);
sDebug
(
"%d sync-io msgs are received"
,
numOfMsgs
);
if
(
numOfMsgs
<=
0
)
break
;
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
sDebug
(
"sync-io recv type:%d msg:%s"
,
pRpcMsg
->
msgType
,
(
char
*
)(
pRpcMsg
->
pCont
));
}
taosResetQitems
(
qall
);
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
if
(
pRpcMsg
->
handle
!=
NULL
)
{
int
msgSize
=
128
;
memset
(
&
rpcMsg
,
0
,
sizeof
(
rpcMsg
));
rpcMsg
.
pCont
=
rpcMallocCont
(
msgSize
);
rpcMsg
.
contLen
=
msgSize
;
rpcMsg
.
handle
=
pRpcMsg
->
handle
;
rpcMsg
.
code
=
0
;
rpcSendResponse
(
&
rpcMsg
);
}
taosFreeQitem
(
pRpcMsg
);
}
}
taosFreeQall
(
qall
);
return
NULL
;
}
static
int
retrieveAuthInfo
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
return
ret
;
}
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sDebug
(
"processResponse ... "
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SSyncIO
*
io
=
pParent
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
sDebug
(
"request is received, type:%d, contLen:%d, item:%p"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pTemp
);
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
}
SSyncIO
*
syncIOCreate
()
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
start
=
doSyncIOStart
;
io
->
stop
=
doSyncIOStop
;
io
->
ping
=
doSyncIOPing
;
io
->
onMessage
=
doSyncIOOnMessage
;
io
->
destroy
=
doSyncIODestroy
;
return
io
;
}
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
)
{
taosBlockSIGPIPE
();
tsRpcForceTcp
=
1
;
// cient rpc init
{
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"SYNC-IO-CLIENT"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processResponse
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"sync-io"
;
rpcInit
.
secret
=
"sync-io"
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
0
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
io
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
io
->
clientRpc
==
NULL
)
{
sError
(
"failed to initialize RPC"
);
return
-
1
;
}
}
// server rpc init
{
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
38000
;
rpcInit
.
label
=
"SYNC-IO-SERVER"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
afp
=
retrieveAuthInfo
;
rpcInit
.
parent
=
io
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pRpc
==
NULL
)
{
sError
(
"failed to start RPC server"
);
return
-
1
;
}
}
io
->
epSet
.
inUse
=
0
;
addEpIntoEpSet
(
&
io
->
epSet
,
"127.0.0.1"
,
38000
);
// start consumer thread
{
if
(
pthread_create
(
&
io
->
tid
,
NULL
,
syncConsumer
,
io
)
!=
0
)
{
sError
(
"failed to create sync consumer thread since %s"
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
// start tmr thread
io
->
syncTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC"
);
io
->
syncTimer
=
taosTmrStart
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
);
return
0
;
}
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
)
{
atomic_store_8
(
&
io
->
isStart
,
0
);
pthread_join
(
io
->
tid
,
NULL
);
return
0
;
}
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
)
{
SRpcMsg
rpcMsg
,
rspMsg
;
rpcMsg
.
pCont
=
rpcMallocCont
(
10
);
snprintf
(
rpcMsg
.
pCont
,
10
,
"ping"
);
rpcMsg
.
contLen
=
10
;
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
1
;
rpcSendRequest
(
io
->
clientRpc
,
&
io
->
epSet
,
&
rpcMsg
,
NULL
);
return
0
;
}
static
int32_t
doSyncIOOnMessage
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
0
;
}
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
)
{
int8_t
start
=
atomic_load_8
(
&
io
->
isStart
);
assert
(
start
==
0
);
if
(
io
->
serverRpc
!=
NULL
)
{
free
(
io
->
serverRpc
);
io
->
serverRpc
=
NULL
;
}
if
(
io
->
clientRpc
!=
NULL
)
{
free
(
io
->
clientRpc
);
io
->
clientRpc
=
NULL
;
}
if
(
io
->
pMsgQ
!=
NULL
)
{
free
(
io
->
pMsgQ
);
io
->
pMsgQ
=
NULL
;
}
if
(
io
->
pQset
!=
NULL
)
{
free
(
io
->
pQset
);
io
->
pQset
=
NULL
;
}
return
0
;
}
\ No newline at end of file
source/libs/sync/src/syncMain.c
浏览文件 @
e8780ceb
...
...
@@ -14,13 +14,10 @@
*/
#include <stdint.h>
#include "sync
Env
.h"
#include "sync.h"
#include "syncInt.h"
int32_t
syncInit
()
{
int32_t
ret
=
syncEnvStart
();
return
ret
;
}
int32_t
syncInit
()
{
return
0
;
}
void
syncCleanUp
()
{}
...
...
@@ -34,9 +31,4 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r
ESyncState
syncGetMyRole
(
int64_t
rid
)
{
return
TAOS_SYNC_STATE_LEADER
;
}
void
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
pNodeRole
)
{}
SSyncNode
*
syncNodeStart
(
const
SSyncInfo
*
pSyncInfo
)
{
return
NULL
;
}
void
syncNodeStop
(
SSyncNode
*
pSyncNode
)
{}
int32_t
syncNodeForwardToPeer
(
SSyncNode
*
pSyncNode
,
const
SSyncBuffer
*
pBuf
,
bool
isWeak
)
{
return
0
;
}
\ No newline at end of file
void
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
pNodeRole
)
{}
\ No newline at end of file
source/libs/sync/src/syncMessage.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncMessage.h"
#include "sync.h"
#include "syncRaft.h"
void
onMessage
(
SRaft
*
pRaft
,
void
*
pMsg
)
{}
\ No newline at end of file
source/libs/sync/src/syncOnMessage.c
浏览文件 @
e8780ceb
...
...
@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync
OnMessage
.h"
#include "sync.h"
source/libs/sync/src/syncRaft.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,47 @@
*/
#include "syncRaft.h"
#include "sync.h"
SRaft
*
raftCreate
(
SRaftId
raftId
,
void
*
data
)
{
SRaft
*
pRaft
=
(
SRaft
*
)
malloc
(
sizeof
(
SRaft
));
assert
(
pRaft
!=
NULL
);
pRaft
->
id
=
raftId
;
pRaft
->
data
=
data
;
pRaft
->
FpPing
=
doRaftPing
;
pRaft
->
FpOnPing
=
onRaftPing
;
pRaft
->
FpOnPingReply
=
onRaftPingReply
;
pRaft
->
FpRequestVote
=
doRaftRequestVote
;
pRaft
->
FpOnRequestVote
=
onRaftRequestVote
;
pRaft
->
FpOnRequestVoteReply
=
onRaftRequestVoteReply
;
pRaft
->
FpAppendEntries
=
doRaftAppendEntries
;
pRaft
->
FpOnAppendEntries
=
onRaftAppendEntries
;
pRaft
->
FpOnAppendEntriesReply
=
onRaftAppendEntriesReply
;
return
pRaft
;
}
static
int32_t
doRaftPing
(
struct
SRaft
*
ths
,
const
RaftPing
*
pMsg
)
{
return
0
;
}
static
int32_t
onRaftPing
(
struct
SRaft
*
ths
,
RaftPing
*
pMsg
)
{
return
0
;
}
static
int32_t
onRaftPingReply
(
struct
SRaft
*
ths
,
RaftPingReply
*
pMsg
)
{
return
0
;
}
static
int32_t
doRaftRequestVote
(
struct
SRaft
*
ths
,
const
RaftRequestVote
*
pMsg
)
{
return
0
;
}
static
int32_t
onRaftRequestVote
(
struct
SRaft
*
ths
,
RaftRequestVote
*
pMsg
)
{
return
0
;
}
static
int32_t
onRaftRequestVoteReply
(
struct
SRaft
*
ths
,
RaftRequestVoteReply
*
pMsg
)
{
return
0
;
}
static
int32_t
doRaftAppendEntries
(
struct
SRaft
*
ths
,
const
RaftAppendEntries
*
pMsg
)
{
return
0
;
}
static
int32_t
onRaftAppendEntries
(
struct
SRaft
*
ths
,
RaftAppendEntries
*
pMsg
)
{
return
0
;
}
static
int32_t
onRaftAppendEntriesReply
(
struct
SRaft
*
ths
,
RaftAppendEntriesReply
*
pMsg
)
{
return
0
;
}
int32_t
raftPropose
(
SRaft
*
pRaft
,
const
SSyncBuffer
*
pBuf
,
bool
isWeak
)
{
return
0
;
}
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
e8780ceb
...
...
@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync
RaftEntry
.h"
#include "sync.h"
source/libs/sync/src/syncRaftLog.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncRaftLog.h"
#include "sync.h"
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncBuffer
*
pBuf
)
{
return
0
;
}
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
e8780ceb
...
...
@@ -14,118 +14,12 @@
*/
#include "syncRaftStore.h"
#include "
cJSON
.h"
#include "
sync
.h"
SRaftStore
*
raftStoreOpen
(
const
char
*
path
)
{
int32_t
ret
;
int32_t
currentTerm
(
SyncTerm
*
pCurrentTerm
)
{
return
0
;
}
SRaftStore
*
pRaftStore
=
malloc
(
sizeof
(
SRaftStore
));
if
(
pRaftStore
==
NULL
)
{
sError
(
"raftStoreOpen malloc error"
);
return
NULL
;
}
memset
(
pRaftStore
,
0
,
sizeof
(
*
pRaftStore
));
snprintf
(
pRaftStore
->
path
,
sizeof
(
pRaftStore
->
path
),
"%s"
,
path
);
int32_t
persistCurrentTerm
(
SyncTerm
currentTerm
)
{
return
0
;
}
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
];
memset
(
storeBuf
,
0
,
sizeof
(
storeBuf
));
int32_t
voteFor
(
SRaftId
*
pRaftId
)
{
return
0
;
}
if
(
!
raftStoreFileExist
(
pRaftStore
->
path
))
{
ret
=
raftStoreInit
(
pRaftStore
);
assert
(
ret
==
0
);
}
pRaftStore
->
fd
=
taosOpenFileReadWrite
(
pRaftStore
->
path
);
if
(
pRaftStore
->
fd
<
0
)
{
return
NULL
;
}
int
len
=
taosReadFile
(
pRaftStore
->
fd
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
len
==
RAFT_STORE_BLOCK_SIZE
);
ret
=
raftStoreDeserialize
(
pRaftStore
,
storeBuf
,
len
);
assert
(
ret
==
0
);
return
pRaftStore
;
}
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
)
{
pRaftStore
->
fd
=
taosOpenFileCreateWrite
(
pRaftStore
->
path
);
if
(
pRaftStore
->
fd
<
0
)
{
return
-
1
;
}
pRaftStore
->
currentTerm
=
0
;
pRaftStore
->
voteFor
.
addr
=
0
;
pRaftStore
->
voteFor
.
vgId
=
0
;
int32_t
ret
=
raftStorePersist
(
pRaftStore
);
assert
(
ret
==
0
);
taosCloseFile
(
pRaftStore
->
fd
);
return
0
;
}
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
taosCloseFile
(
pRaftStore
->
fd
);
free
(
pRaftStore
);
return
0
;
}
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
)
{
int32_t
ret
;
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
];
ret
=
raftStoreSerialize
(
pRaftStore
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
ret
==
0
);
taosLSeekFile
(
pRaftStore
->
fd
,
0
,
SEEK_SET
);
ret
=
taosWriteFile
(
pRaftStore
->
fd
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
ret
==
RAFT_STORE_BLOCK_SIZE
);
fsync
(
pRaftStore
->
fd
);
return
0
;
}
static
bool
raftStoreFileExist
(
char
*
path
)
{
return
taosStatFile
(
path
,
NULL
,
NULL
)
>=
0
;
}
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"current_term"
,
pRaftStore
->
currentTerm
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_for_addr"
,
pRaftStore
->
voteFor
.
addr
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_for_vgid"
,
pRaftStore
->
voteFor
.
vgId
);
char
*
serialized
=
cJSON_Print
(
pRoot
);
int
len2
=
strlen
(
serialized
);
assert
(
len2
<
len
);
memset
(
buf
,
0
,
len
);
snprintf
(
buf
,
len
,
"%s"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pRoot
);
return
0
;
}
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
assert
(
len
>
0
&&
len
<=
RAFT_STORE_BLOCK_SIZE
);
cJSON
*
pRoot
=
cJSON_Parse
(
buf
);
cJSON
*
pCurrentTerm
=
cJSON_GetObjectItem
(
pRoot
,
"current_term"
);
pRaftStore
->
currentTerm
=
pCurrentTerm
->
valueint
;
cJSON
*
pVoteForAddr
=
cJSON_GetObjectItem
(
pRoot
,
"vote_for_addr"
);
pRaftStore
->
voteFor
.
addr
=
pVoteForAddr
->
valueint
;
cJSON
*
pVoteForVgid
=
cJSON_GetObjectItem
(
pRoot
,
"vote_for_vgid"
);
pRaftStore
->
voteFor
.
vgId
=
pVoteForVgid
->
valueint
;
cJSON_Delete
(
pRoot
);
return
0
;
}
void
raftStorePrint
(
SRaftStore
*
pRaftStore
)
{
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
];
raftStoreSerialize
(
pRaftStore
,
storeBuf
,
sizeof
(
storeBuf
));
printf
(
"%s
\n
"
,
storeBuf
);
}
int32_t
persistVoteFor
(
SRaftId
*
pRaftId
)
{
return
0
;
}
\ No newline at end of file
source/libs/sync/src/syncReplication.c
浏览文件 @
e8780ceb
...
...
@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync
Replication
.h"
#include "sync.h"
source/libs/sync/src/syncRequestVote.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncRequestVote.h"
#include "sync.h"
void
requestVote
(
SRaft
*
pRaft
,
const
SyncRequestVote
*
pMsg
)
{
// TLA+ Spec
...
...
source/libs/sync/src/syncRequestVoteReply.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncRequestVoteReply.h"
#include "sync.h"
void
onRequestVoteReply
(
SRaft
*
pRaft
,
const
SyncRequestVoteReply
*
pMsg
)
{
// TLA+ Spec
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
e8780ceb
...
...
@@ -14,6 +14,7 @@
*/
#include "syncSnapshot.h"
#include "sync.h"
#include "syncRaft.h"
int32_t
takeSnapshot
(
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
return
0
;
}
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
e8780ceb
...
...
@@ -14,5 +14,6 @@
*/
#include "syncTimeout.h"
#include "sync.h"
void
onTimeout
(
SRaft
*
pRaft
,
void
*
pMsg
)
{}
\ No newline at end of file
source/libs/sync/src/syncVoteMgr.c
已删除
100644 → 0
浏览文件 @
b6399528
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncVoteMgr.h"
source/libs/sync/test/CMakeLists.txt
已删除
100644 → 0
浏览文件 @
b6399528
add_executable
(
syncTest
""
)
add_executable
(
syncEnvTest
""
)
add_executable
(
syncPingTest
""
)
target_sources
(
syncTest
PRIVATE
"syncTest.cpp"
)
target_sources
(
syncEnvTest
PRIVATE
"syncEnvTest.cpp"
)
target_sources
(
syncPingTest
PRIVATE
"syncPingTest.cpp"
)
target_include_directories
(
syncTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncEnvTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncPingTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
syncTest
sync
gtest_main
)
target_link_libraries
(
syncEnvTest
sync
gtest_main
)
target_link_libraries
(
syncPingTest
sync
gtest_main
)
enable_testing
()
add_test
(
NAME sync_test
COMMAND syncTest
)
source/libs/sync/test/syncEnvTest.cpp
已删除
100644 → 0
浏览文件 @
b6399528
#include "syncEnv.h"
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
void
doSync
()
{
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
1
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
replicaNum
=
3
;
pCfg
->
nodeInfo
[
0
].
nodePort
=
7010
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
);
pCfg
->
nodeInfo
[
1
].
nodePort
=
7110
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
);
pCfg
->
nodeInfo
[
2
].
nodePort
=
7210
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
);
SSyncNode
*
pSyncNode
=
syncNodeStart
(
&
syncInfo
);
assert
(
pSyncNode
!=
NULL
);
}
int
main
()
{
taosInitLog
((
char
*
)
"syncEnvTest.log"
,
100000
,
10
);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
=
syncIOStart
();
assert
(
ret
==
0
);
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
doSync
();
while
(
1
)
{
taosMsleep
(
1000
);
}
return
0
;
}
source/libs/sync/test/syncPingTest.cpp
已删除
100644 → 0
浏览文件 @
b6399528
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
void
doSync
()
{
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
1
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
0
;
pCfg
->
replicaNum
=
3
;
pCfg
->
nodeInfo
[
0
].
nodePort
=
7010
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
);
pCfg
->
nodeInfo
[
1
].
nodePort
=
7110
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
);
pCfg
->
nodeInfo
[
2
].
nodePort
=
7210
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
);
SSyncNode
*
pSyncNode
=
syncNodeStart
(
&
syncInfo
);
assert
(
pSyncNode
!=
NULL
);
}
int
main
()
{
taosInitLog
((
char
*
)
"syncPingTest.log"
,
100000
,
10
);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
=
syncIOStart
();
assert
(
ret
==
0
);
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
doSync
();
while
(
1
)
{
taosMsleep
(
1000
);
}
return
0
;
}
source/libs/sync/test/syncTest.cpp
浏览文件 @
e8780ceb
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
*
pingFunc
(
void
*
param
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
while
(
1
)
{
sDebug
(
"io->ping"
);
io
->
ping
(
io
);
sleep
(
1
);
}
return
NULL
;
}
int
main
()
{
taosInitLog
((
char
*
)
"syncTest.log"
,
100000
,
10
);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
sTrace
(
"sync log test: trace"
);
sDebug
(
"sync log test: debug"
);
sInfo
(
"sync log test: info"
);
sWarn
(
"sync log test: warn"
);
sError
(
"sync log test: error"
);
sFatal
(
"sync log test: fatal"
);
SRaftStore
*
pRaftStore
=
raftStoreOpen
(
"./raft_store.json"
);
assert
(
pRaftStore
!=
NULL
);
raftStorePrint
(
pRaftStore
);
pRaftStore
->
currentTerm
=
100
;
pRaftStore
->
voteFor
.
addr
=
200
;
pRaftStore
->
voteFor
.
vgId
=
300
;
raftStorePrint
(
pRaftStore
);
raftStorePersist
(
pRaftStore
);
sDebug
(
"sync test"
);
SSyncIO
*
syncIO
=
syncIOCreate
();
assert
(
syncIO
!=
NULL
);
syncIO
->
start
(
syncIO
);
sleep
(
2
);
pthread_t
tid
;
pthread_create
(
&
tid
,
NULL
,
pingFunc
,
syncIO
);
while
(
1
)
{
sleep
(
1
);
}
return
0
;
printf
(
"test
\n
"
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录