Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
06bbae7d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
06bbae7d
编写于
3月 03, 2022
作者:
L
Li Minghao
提交者:
GitHub
3月 03, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10517 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
上级
11064f18
a78d7028
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
1765 addition
and
291 deletion
+1765
-291
include/libs/sync/sync.h
include/libs/sync/sync.h
+13
-9
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+11
-4
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+20
-25
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+64
-42
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+82
-23
source/libs/sync/inc/syncRaft.h
source/libs/sync/inc/syncRaft.h
+4
-0
source/libs/sync/inc/syncRaftStore.h
source/libs/sync/inc/syncRaftStore.h
+3
-2
source/libs/sync/inc/syncUtil.h
source/libs/sync/inc/syncUtil.h
+57
-0
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+44
-2
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+210
-128
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+226
-31
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+261
-1
source/libs/sync/src/syncRaft.c
source/libs/sync/src/syncRaft.c
+4
-0
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+110
-8
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+90
-0
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+98
-0
source/libs/sync/test/syncEncodeTest.cpp
source/libs/sync/test/syncEncodeTest.cpp
+184
-0
source/libs/sync/test/syncEnvTest.cpp
source/libs/sync/test/syncEnvTest.cpp
+5
-4
source/libs/sync/test/syncIOSendMsgClientTest.cpp
source/libs/sync/test/syncIOSendMsgClientTest.cpp
+50
-0
source/libs/sync/test/syncIOSendMsgServerTest.cpp
source/libs/sync/test/syncIOSendMsgServerTest.cpp
+33
-0
source/libs/sync/test/syncIOSendMsgTest.cpp
source/libs/sync/test/syncIOSendMsgTest.cpp
+50
-0
source/libs/sync/test/syncIOTickPingTest.cpp
source/libs/sync/test/syncIOTickPingTest.cpp
+35
-0
source/libs/sync/test/syncIOTickQTest.cpp
source/libs/sync/test/syncIOTickQTest.cpp
+35
-0
source/libs/sync/test/syncPingTest.cpp
source/libs/sync/test/syncPingTest.cpp
+31
-9
source/libs/sync/test/syncRaftStoreTest.cpp
source/libs/sync/test/syncRaftStoreTest.cpp
+42
-0
source/libs/sync/test/syncTest.cpp
source/libs/sync/test/syncTest.cpp
+3
-3
未找到文件。
include/libs/sync/sync.h
浏览文件 @
06bbae7d
...
...
@@ -34,7 +34,9 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER
=
0
,
TAOS_SYNC_STATE_CANDIDATE
=
1
,
TAOS_SYNC_STATE_LEADER
=
2
,
}
ESyncState
;
}
ESyncRole
;
typedef
ESyncRole
ESyncState
;
typedef
struct
SSyncBuffer
{
void
*
data
;
...
...
@@ -69,15 +71,15 @@ typedef struct SSyncFSM {
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void
(
*
FpCommitCb
)(
struct
SSyncFSM
*
pFsm
,
const
S
SyncBuffer
*
pBuf
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
);
void
(
*
FpCommitCb
)(
struct
SSyncFSM
*
pFsm
,
const
S
RpcMsg
*
pBuf
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
);
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void
(
*
FpPreCommitCb
)(
struct
SSyncFSM
*
pFsm
,
const
S
SyncBuffer
*
pBuf
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
);
void
(
*
FpPreCommitCb
)(
struct
SSyncFSM
*
pFsm
,
const
S
RpcMsg
*
pBuf
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
);
// when log entry is updated by a new one, FpRollBackCb is called
// user can do something to roll back. for example, delete data from tsdb, or just ignore it
void
(
*
FpRollBackCb
)(
struct
SSyncFSM
*
pFsm
,
const
S
SyncBuffer
*
pBuf
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
);
void
(
*
FpRollBackCb
)(
struct
SSyncFSM
*
pFsm
,
const
S
RpcMsg
*
pBuf
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
);
// user should implement this function, use "data" to take snapshot into "snapshot"
int32_t
(
*
FpTakeSnapshot
)(
SSnapshot
*
snapshot
);
...
...
@@ -93,10 +95,10 @@ typedef struct SSyncLogStore {
void
*
data
;
// append one log entry
int32_t
(
*
appendEntry
)(
struct
SSyncLogStore
*
pLogStore
,
S
SyncBuffer
*
pBuf
);
int32_t
(
*
appendEntry
)(
struct
SSyncLogStore
*
pLogStore
,
S
RpcMsg
*
pBuf
);
// get one log entry, user need to free pBuf->data
int32_t
(
*
getEntry
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
,
S
SyncBuffer
*
pBuf
);
int32_t
(
*
getEntry
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
,
S
RpcMsg
*
pBuf
);
// update log store commit index with "index"
int32_t
(
*
updateCommitIndex
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
...
...
@@ -135,7 +137,9 @@ typedef struct SSyncInfo {
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
SSyncFSM
*
pFsm
;
int32_t
(
*
FpSendMsg
)(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
void
*
rpcClient
;
int32_t
(
*
FpSendMsg
)(
void
*
rpcClient
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
}
SSyncInfo
;
...
...
@@ -149,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo);
void
syncStop
(
int64_t
rid
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pSyncCfg
);
//
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SSyncBuffer
*
pBuf
,
bool
isWeak
);
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SRpcMsg
*
pBuf
,
bool
isWeak
);
//
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
ESyncState
syncGetMyRole
(
int64_t
rid
);
void
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
pNodeRole
);
...
...
source/libs/sync/inc/syncEnv.h
浏览文件 @
06bbae7d
...
...
@@ -26,19 +26,26 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
typedef
struct
SSyncEnv
{
void
*
pTimer
;
void
*
pTimerManager
;
tmr_h
pEnvTickTimer
;
tmr_h
pTimerManager
;
char
name
[
128
];
}
SSyncEnv
;
extern
SSyncEnv
*
gSyncEnv
;
int32_t
syncEnvStart
();
int32_t
syncEnvStop
();
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
);
tmr_h
syncEnvStartTimer
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
void
syncEnvStopTimer
(
tmr_h
*
pTimer
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncIO.h
浏览文件 @
06bbae7d
...
...
@@ -30,42 +30,37 @@ extern "C" {
#include "trpc.h"
typedef
struct
SSyncIO
{
void
*
serverRpc
;
void
*
clientRpc
;
STaosQueue
*
pMsgQ
;
STaosQset
*
pQset
;
pthread_t
tid
;
int8_t
isStart
;
pthread_t
consumerTid
;
SEpSet
epSet
;
void
*
serverRpc
;
void
*
clientRpc
;
SEpSet
myAddr
;
void
*
syncTimer
;
void
*
syncTimerManager
;
int32_t
(
*
start
)(
struct
SSyncIO
*
ths
);
int32_t
(
*
stop
)(
struct
SSyncIO
*
ths
);
int32_t
(
*
ping
)(
struct
SSyncIO
*
ths
);
int32_t
(
*
onMsg
)(
struct
SSyncIO
*
ths
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
(
*
destroy
)(
struct
SSyncIO
*
ths
);
void
*
ioTimerTickQ
;
void
*
ioTimerTickPing
;
void
*
ioTimerManager
;
void
*
pSyncNode
;
int32_t
(
*
FpOnPing
)(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
(
*
FpOnSyncPing
)(
SSyncNode
*
pSyncNode
,
SyncPing
*
pMsg
);
int32_t
(
*
FpOnSyncPingReply
)(
SSyncNode
*
pSyncNode
,
SyncPingReply
*
pMsg
);
int32_t
(
*
FpOnSyncRequestVote
)(
SSyncNode
*
pSyncNode
,
SyncRequestVote
*
pMsg
);
int32_t
(
*
FpOnSyncRequestVoteReply
)(
SSyncNode
*
pSyncNode
,
SyncRequestVoteReply
*
pMsg
);
int32_t
(
*
FpOnSyncAppendEntries
)(
SSyncNode
*
pSyncNode
,
SyncAppendEntries
*
pMsg
);
int32_t
(
*
FpOnSyncAppendEntriesReply
)(
SSyncNode
*
pSyncNode
,
SyncAppendEntriesReply
*
pMsg
);
int8_t
isStart
;
}
SSyncIO
;
extern
SSyncIO
*
gSyncIO
;
int32_t
syncIOStart
();
int32_t
syncIOStop
();
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
SSyncIO
*
syncIOCreate
();
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
);
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
);
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
);
static
int32_t
doSyncIOOnMsg
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
);
int32_t
syncIOStart
(
char
*
host
,
uint16_t
port
);
int32_t
syncIOStop
();
int32_t
syncIOSendMsg
(
void
*
clientRpc
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
int32_t
syncIOTickQ
();
int32_t
syncIOTickPing
();
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
06bbae7d
...
...
@@ -26,6 +26,7 @@ extern "C" {
#include "sync.h"
#include "taosdef.h"
#include "tlog.h"
#include "ttimer.h"
extern
int32_t
sDebugFlag
;
...
...
@@ -47,23 +48,23 @@ extern int32_t sDebugFlag;
taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \
} \
}
#define sInfo(...) \
{ \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
} \
#define sInfo(...)
\
{
\
if (sDebugFlag & DEBUG_INFO) {
\
taosPrintLog("SYN
INFO
", sDebugFlag, __VA_ARGS__); \
}
\
}
#define sDebug(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
} \
#define sDebug(...)
\
{
\
if (sDebugFlag & DEBUG_DEBUG) {
\
taosPrintLog("SYN
DEBUG
", sDebugFlag, __VA_ARGS__); \
}
\
}
#define sTrace(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
} \
#define sTrace(...)
\
{
\
if (sDebugFlag & DEBUG_TRACE) {
\
taosPrintLog("SYN
TRACE
", sDebugFlag, __VA_ARGS__); \
}
\
}
struct
SRaft
;
...
...
@@ -87,35 +88,64 @@ typedef struct SyncAppendEntries SyncAppendEntries;
struct
SyncAppendEntriesReply
;
typedef
struct
SyncAppendEntriesReply
SyncAppendEntriesReply
;
typedef
struct
SSyncNode
{
int8_t
replica
;
int8_t
quorum
;
struct
SSyncEnv
;
typedef
struct
SSyncEnv
SSyncEnv
;
typedef
struct
SRaftId
{
SyncNodeId
addr
;
// typedef uint64_t SyncNodeId;
SyncGroupId
vgId
;
// typedef int32_t SyncGroupId;
}
SRaftId
;
typedef
struct
SSyncNode
{
SyncGroupId
vgId
;
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
SSyncFSM
*
pFsm
;
// passed from outside
void
*
rpcClient
;
int32_t
(
*
FpSendMsg
)(
void
*
rpcClient
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
SRaft
*
pRaft
;
int32_t
refCount
;
int64_t
rid
;
int32_t
(
*
FpPing
)(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
SNodeInfo
me
;
SNodeInfo
peers
[
TSDB_MAX_REPLICA
];
int32_t
peersNum
;
int32_t
(
*
FpOnPing
)(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
ESyncRole
role
;
SRaftId
raftId
;
int32_t
(
*
FpOnPingReply
)(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
tmr_h
pPingTimer
;
int32_t
pingTimerMS
;
uint8_t
pingTimerStart
;
TAOS_TMR_CALLBACK
FpPingTimer
;
// Timer Fp
uint64_t
pingTimerCounter
;
int32_t
(
*
FpRequestVote
)(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
);
tmr_h
pElectTimer
;
int32_t
electTimerMS
;
uint8_t
electTimerStart
;
TAOS_TMR_CALLBACK
FpElectTimer
;
// Timer Fp
uint64_t
electTimerCounter
;
int32_t
(
*
FpOnRequestVote
)(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
tmr_h
pHeartbeatTimer
;
int32_t
heartbeatTimerMS
;
uint8_t
heartbeatTimerStart
;
TAOS_TMR_CALLBACK
FpHeartbeatTimer
;
// Timer Fp
uint64_t
heartbeatTimerCounter
;
int32_t
(
*
FpOnRequestVoteReply
)(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
// callback
int32_t
(
*
FpOnPing
)(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
(
*
Fp
AppendEntries
)(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
int32_t
(
*
Fp
OnPingReply
)(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
(
*
FpOn
AppendEntries
)(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
(
*
FpOn
RequestVote
)(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
int32_t
(
*
FpOn
AppendEntriesReply
)(
struct
SSyncNode
*
ths
,
SyncAppendEntries
Reply
*
pMsg
);
int32_t
(
*
FpOn
RequestVoteReply
)(
SSyncNode
*
ths
,
SyncRequestVote
Reply
*
pMsg
);
int32_t
(
*
FpSendMsg
)(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
int32_t
(
*
FpOnAppendEntries
)(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
(
*
FpOnAppendEntriesReply
)(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
}
SSyncNode
;
...
...
@@ -123,23 +153,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
static
int32_t
onSyncNodePing
(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
static
int32_t
onSyncNodePingReply
(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
static
int32_t
doSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
);
static
int32_t
onSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
void
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
static
int32_t
onSyncNodeRequestVoteReply
(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
void
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
);
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
);
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncNodeStopPingTimer
(
SSyncNode
*
pSyncNode
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncMessage.h
浏览文件 @
06bbae7d
...
...
@@ -23,13 +23,15 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "sync.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
// encode as uint64
typedef
enum
ESyncMessageType
{
SYNC_PING
=
0
,
SYNC_PING_REPLY
,
SYNC_PING
=
101
,
SYNC_PING_REPLY
=
103
,
SYNC_CLIENT_REQUEST
,
SYNC_CLIENT_REQUEST_REPLY
,
SYNC_REQUEST_VOTE
,
...
...
@@ -38,29 +40,86 @@ typedef enum ESyncMessageType {
SYNC_APPEND_ENTRIES_REPLY
,
}
ESyncMessageType
;
/*
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
*/
typedef
struct
SyncPing
{
ESyncMessageType
msgType
;
const
SSyncBuffer
*
pData
;
}
SyncPing
,
RaftPing
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
uint32_t
dataLen
;
char
data
[];
}
SyncPing
;
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
void
syncPingDestroy
(
SyncPing
*
pMsg
);
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
);
SyncPing
*
syncPingBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPing
*
syncPingBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
typedef
struct
SyncPingReply
{
ESyncMessageType
msgType
;
const
SSyncBuffer
*
pData
;
}
SyncPingReply
,
RaftPingReply
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
uint32_t
dataLen
;
char
data
[];
}
SyncPingReply
;
#define SYNC_PING_REPLY_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
);
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
);
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
);
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
);
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
);
SyncPingReply
*
syncPingReplyBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPingReply
*
syncPingReplyBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
typedef
struct
SyncClientRequest
{
ESyncMessageType
msgType
;
const
SSyncBuffer
*
pData
;
int64_t
seqNum
;
bool
isWeak
;
}
SyncClientRequest
,
RaftClientRequest
;
ESyncMessageType
msgType
;
char
*
data
;
uint32_t
dataLen
;
int64_t
seqNum
;
bool
isWeak
;
}
SyncClientRequest
;
typedef
struct
SyncClientRequestReply
{
ESyncMessageType
msgType
;
int32_t
errCode
;
const
SSyncBuffer
*
pErrMsg
;
const
SSyncBuffer
*
pLeaderHint
;
}
SyncClientRequestReply
,
RaftClientRequestReply
;
ESyncMessageType
msgType
;
int32_t
errCode
;
SSyncBuffer
*
pErrMsg
;
SSyncBuffer
*
pLeaderHint
;
}
SyncClientRequestReply
;
typedef
struct
SyncRequestVote
{
ESyncMessageType
msgType
;
...
...
@@ -69,7 +128,7 @@ typedef struct SyncRequestVote {
SyncGroupId
vgId
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
}
SyncRequestVote
,
RaftRequestVote
;
}
SyncRequestVote
;
typedef
struct
SyncRequestVoteReply
{
ESyncMessageType
msgType
;
...
...
@@ -77,7 +136,7 @@ typedef struct SyncRequestVoteReply {
SyncNodeId
nodeId
;
SyncGroupId
vgId
;
bool
voteGranted
;
}
SyncRequestVoteReply
,
RaftRequestVoteReply
;
}
SyncRequestVoteReply
;
typedef
struct
SyncAppendEntries
{
ESyncMessageType
msgType
;
...
...
@@ -86,9 +145,9 @@ typedef struct SyncAppendEntries {
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
int32_t
entryCount
;
SSyncRaftEntry
*
logEntries
;
SSyncRaftEntry
*
logEntries
;
SyncIndex
commitIndex
;
}
SyncAppendEntries
,
RaftAppendEntries
;
}
SyncAppendEntries
;
typedef
struct
SyncAppendEntriesReply
{
ESyncMessageType
msgType
;
...
...
@@ -96,7 +155,7 @@ typedef struct SyncAppendEntriesReply {
SyncNodeId
nodeId
;
bool
success
;
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
,
RaftAppendEntriesReply
;
}
SyncAppendEntriesReply
;
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncRaft.h
浏览文件 @
06bbae7d
...
...
@@ -27,6 +27,8 @@ extern "C" {
#include "syncMessage.h"
#include "taosdef.h"
#if 0
typedef struct SRaftId {
SyncNodeId addr;
SyncGroupId vgId;
...
...
@@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
#endif
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/inc/syncRaftStore.h
浏览文件 @
06bbae7d
...
...
@@ -34,8 +34,9 @@ extern "C" {
typedef
struct
SRaftStore
{
SyncTerm
currentTerm
;
SRaftId
voteFor
;
//FileFd fd;
char
path
[
RAFT_STORE_PATH_LEN
];
// FileFd fd;
TdFilePtr
pFile
;
char
path
[
RAFT_STORE_PATH_LEN
];
}
SRaftStore
;
SRaftStore
*
raftStoreOpen
(
const
char
*
path
);
...
...
source/libs/sync/inc/syncUtil.h
0 → 100644
浏览文件 @
06bbae7d
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_UTIL_H
#define _TD_LIBS_SYNC_UTIL_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// ---- encode / decode
uint64_t
syncUtilAddr2U64
(
const
char
*
host
,
uint16_t
port
);
void
syncUtilU642Addr
(
uint64_t
u64
,
char
*
host
,
size_t
len
,
uint16_t
*
port
);
void
syncUtilnodeInfo2EpSet
(
const
SNodeInfo
*
pNodeInfo
,
SEpSet
*
pEpSet
);
void
syncUtilraftId2EpSet
(
const
SRaftId
*
raftId
,
SEpSet
*
pEpSet
);
void
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
);
// ---- SSyncBuffer ----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
#endif
#ifdef __cplusplus
}
#endif
#endif
/*_TD_LIBS_SYNC_UTIL_H*/
source/libs/sync/src/syncEnv.c
浏览文件 @
06bbae7d
...
...
@@ -18,6 +18,14 @@
SSyncEnv
*
gSyncEnv
=
NULL
;
// local function -----------------
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
);
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
static
tmr_h
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
);
static
void
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
,
tmr_h
*
pTimer
);
// --------------------------------
int32_t
syncEnvStart
()
{
int32_t
ret
;
gSyncEnv
=
(
SSyncEnv
*
)
malloc
(
sizeof
(
SSyncEnv
));
...
...
@@ -31,6 +39,40 @@ int32_t syncEnvStop() {
return
ret
;
}
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
)
{
return
0
;
}
tmr_h
syncEnvStartTimer
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
)
{
return
doSyncEnvStartTimer
(
gSyncEnv
,
fp
,
mseconds
,
param
);
}
void
syncEnvStopTimer
(
tmr_h
*
pTimer
)
{
doSyncEnvStopTimer
(
gSyncEnv
,
pTimer
);
}
// local function -----------------
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
)
{
SSyncEnv
*
pSyncEnv
=
(
SSyncEnv
*
)
param
;
sTrace
(
"syncEnvTick ... name:%s "
,
pSyncEnv
->
name
);
pSyncEnv
->
pEnvTickTimer
=
taosTmrStart
(
syncEnvTick
,
1000
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
}
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
)
{
snprintf
(
pSyncEnv
->
name
,
sizeof
(
pSyncEnv
->
name
),
"SyncEnv_%p"
,
pSyncEnv
);
// start tmr thread
pSyncEnv
->
pTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC-ENV"
);
// pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
sTrace
(
"SyncEnv start ok, name:%s"
,
pSyncEnv
->
name
);
return
0
;
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
taosTmrCleanUp
(
pSyncEnv
->
pTimerManager
);
return
0
;
}
static
tmr_h
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
)
{
return
taosTmrStart
(
fp
,
mseconds
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
return
0
;
}
static
void
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
,
tmr_h
*
pTimer
)
{
}
source/libs/sync/src/syncIO.c
浏览文件 @
06bbae7d
...
...
@@ -22,118 +22,72 @@
SSyncIO
*
gSyncIO
=
NULL
;
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
return
0
;
}
int32_t
syncIOStart
()
{
return
0
;
}
int32_t
syncIOStop
()
{
return
0
;
}
static
void
syncTick
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sDebug
(
"syncTick ... "
);
SRpcMsg
rpcMsg
;
rpcMsg
.
pCont
=
rpcMallocCont
(
10
);
snprintf
(
rpcMsg
.
pCont
,
10
,
"TICK"
);
rpcMsg
.
contLen
=
10
;
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
2
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
&
rpcMsg
,
sizeof
(
SRpcMsg
));
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
io
->
syncTimer
=
taosTmrStart
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
);
// local function ------------
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
);
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
);
static
SSyncIO
*
syncIOCreate
(
char
*
host
,
uint16_t
port
);
static
int32_t
syncIODestroy
(
SSyncIO
*
io
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
int
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
syncIOTickQInternal
(
SSyncIO
*
io
);
static
void
syncIOTickQFunc
(
void
*
param
,
void
*
tmrId
);
static
int32_t
syncIOTickPingInternal
(
SSyncIO
*
io
);
static
void
syncIOTickPingFunc
(
void
*
param
,
void
*
tmrId
);
// ----------------------------
// public function ------------
int32_t
syncIOSendMsg
(
void
*
clientRpc
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
sTrace
(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d"
,
clientRpc
,
pEpSet
->
numOfEps
,
pEpSet
->
inUse
,
pEpSet
->
eps
[
0
].
fqdn
,
pEpSet
->
eps
[
0
].
port
,
pMsg
->
ahandle
,
pMsg
->
handle
,
pMsg
->
msgType
,
pMsg
->
contLen
);
pMsg
->
handle
=
NULL
;
rpcSendRequest
(
clientRpc
,
pEpSet
,
pMsg
,
NULL
);
return
0
;
}
void
*
syncConsumer
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
qall
=
taosAllocateQall
();
int32_t
syncIOStart
(
char
*
host
,
uint16_t
port
)
{
gSyncIO
=
syncIOCreate
(
host
,
port
);
assert
(
gSyncIO
!=
NULL
);
while
(
1
)
{
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
io
->
pQset
,
qall
,
NULL
,
NULL
);
sDebug
(
"%d sync-io msgs are received"
,
numOfMsgs
);
if
(
numOfMsgs
<=
0
)
break
;
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
sDebug
(
"sync-io recv type:%d msg:%s"
,
pRpcMsg
->
msgType
,
(
char
*
)(
pRpcMsg
->
pCont
));
}
taosResetQitems
(
qall
);
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
if
(
pRpcMsg
->
handle
!=
NULL
)
{
int
msgSize
=
128
;
memset
(
&
rpcMsg
,
0
,
sizeof
(
rpcMsg
));
rpcMsg
.
pCont
=
rpcMallocCont
(
msgSize
);
rpcMsg
.
contLen
=
msgSize
;
rpcMsg
.
handle
=
pRpcMsg
->
handle
;
rpcMsg
.
code
=
0
;
rpcSendResponse
(
&
rpcMsg
);
}
int32_t
ret
=
syncIOStartInternal
(
gSyncIO
);
assert
(
ret
==
0
);
taosFreeQitem
(
pRpcMsg
);
}
}
taosFreeQall
(
qall
);
return
NULL
;
sTrace
(
"syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p"
,
gSyncIO
,
gSyncIO
->
clientRpc
);
return
0
;
}
static
int
retrieveAuthInfo
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
return
ret
;
}
int32_t
syncIOStop
()
{
int32_t
ret
=
syncIOStopInternal
(
gSyncIO
);
assert
(
ret
==
0
);
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sDebug
(
"processResponse ... "
);
r
pcFreeCont
(
pMsg
->
pCont
)
;
ret
=
syncIODestroy
(
gSyncIO
);
assert
(
ret
==
0
);
r
eturn
ret
;
}
static
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SSyncIO
*
io
=
pParent
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
sDebug
(
"request is received, type:%d, contLen:%d, item:%p"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pTemp
);
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
int32_t
syncIOTickQ
()
{
int32_t
ret
=
syncIOTickQInternal
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
SSyncIO
*
syncIOCreate
()
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
start
=
doSyncIOStart
;
io
->
stop
=
doSyncIOStop
;
io
->
ping
=
doSyncIOPing
;
io
->
onMsg
=
doSyncIOOnMsg
;
io
->
destroy
=
doSyncIODestroy
;
return
io
;
int32_t
syncIOTickPing
()
{
int32_t
ret
=
syncIOTickPingInternal
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
)
{
// local function ------------
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
)
{
taosBlockSIGPIPE
();
rpcInit
();
tsRpcForceTcp
=
1
;
// cient rpc init
...
...
@@ -143,7 +97,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"SYNC-IO-CLIENT"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processResponse
;
rpcInit
.
cfp
=
syncIOProcessReply
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"sync-io"
;
...
...
@@ -163,13 +117,13 @@ static int32_t doSyncIOStart(SSyncIO *io) {
{
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
38000
;
rpcInit
.
localPort
=
io
->
myAddr
.
eps
[
0
].
port
;
rpcInit
.
label
=
"SYNC-IO-SERVER"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
cfp
=
syncIOProcessRequest
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
afp
=
retrieveAuthInfo
;
rpcInit
.
afp
=
syncIOAuth
;
rpcInit
.
parent
=
io
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
...
...
@@ -180,12 +134,9 @@ static int32_t doSyncIOStart(SSyncIO *io) {
}
}
io
->
epSet
.
inUse
=
0
;
addEpIntoEpSet
(
&
io
->
epSet
,
"127.0.0.1"
,
38000
);
// start consumer thread
{
if
(
pthread_create
(
&
io
->
tid
,
NULL
,
syncConsumer
,
io
)
!=
0
)
{
if
(
pthread_create
(
&
io
->
consumerTid
,
NULL
,
syncIOConsumerFunc
,
io
)
!=
0
)
{
sError
(
"failed to create sync consumer thread since %s"
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -193,35 +144,32 @@ static int32_t doSyncIOStart(SSyncIO *io) {
}
// start tmr thread
io
->
syncTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC"
);
io
->
syncTimer
=
taosTmrStart
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
);
io
->
ioTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC"
);
return
0
;
}
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
)
{
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
)
{
atomic_store_8
(
&
io
->
isStart
,
0
);
pthread_join
(
io
->
t
id
,
NULL
);
pthread_join
(
io
->
consumerT
id
,
NULL
);
return
0
;
}
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
)
{
SRpcMsg
rpcMsg
,
rspMsg
;
static
SSyncIO
*
syncIOCreate
(
char
*
host
,
uint16_t
port
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
rpcMsg
.
pCont
=
rpcMallocCont
(
10
);
snprintf
(
rpcMsg
.
pCont
,
10
,
"ping"
);
rpcMsg
.
contLen
=
10
;
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
1
;
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
rpcSendRequest
(
io
->
clientRpc
,
&
io
->
epSet
,
&
rpcMsg
,
NULL
);
io
->
myAddr
.
inUse
=
0
;
addEpIntoEpSet
(
&
io
->
myAddr
,
host
,
port
);
return
0
;
return
io
;
}
static
int32_t
doSyncIOOnMsg
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
0
;
}
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
)
{
static
int32_t
syncIODestroy
(
SSyncIO
*
io
)
{
int8_t
start
=
atomic_load_8
(
&
io
->
isStart
);
assert
(
start
==
0
);
...
...
@@ -235,15 +183,149 @@ static int32_t doSyncIODestroy(SSyncIO *io) {
io
->
clientRpc
=
NULL
;
}
if
(
io
->
pMsgQ
!=
NULL
)
{
free
(
io
->
pMsgQ
);
io
->
pMsgQ
=
NULL
;
}
taosCloseQueue
(
io
->
pMsgQ
);
taosCloseQset
(
io
->
pQset
);
if
(
io
->
pQset
!=
NULL
)
{
free
(
io
->
pQset
);
io
->
pQset
=
NULL
;
return
0
;
}
static
void
*
syncIOConsumerFunc
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
qall
=
taosAllocateQall
();
while
(
1
)
{
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
io
->
pQset
,
qall
,
NULL
,
NULL
);
sTrace
(
"syncIOConsumerFunc %d msgs are received"
,
numOfMsgs
);
if
(
numOfMsgs
<=
0
)
break
;
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
sTrace
(
"syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s"
,
pRpcMsg
->
msgType
,
pRpcMsg
->
contLen
,
(
char
*
)(
pRpcMsg
->
pCont
));
if
(
pRpcMsg
->
msgType
==
SYNC_PING
)
{
if
(
io
->
FpOnSyncPing
!=
NULL
)
{
SyncPing
*
pSyncMsg
;
SRpcMsg
tmpRpcMsg
;
memcpy
(
&
tmpRpcMsg
,
pRpcMsg
,
sizeof
(
SRpcMsg
));
pSyncMsg
=
syncPingBuild
(
tmpRpcMsg
.
contLen
);
syncPingFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io
->
FpOnSyncPing
(
io
->
pSyncNode
,
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyBuild
(
pRpcMsg
->
contLen
);
syncPingReplyFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
if
(
io
->
FpOnSyncPingReply
!=
NULL
)
{
io
->
FpOnSyncPingReply
(
io
->
pSyncNode
,
pSyncMsg
);
}
}
else
{
;
}
}
taosResetQitems
(
qall
);
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
if
(
pRpcMsg
->
handle
!=
NULL
)
{
int
msgSize
=
128
;
memset
(
&
rpcMsg
,
0
,
sizeof
(
rpcMsg
));
rpcMsg
.
pCont
=
rpcMallocCont
(
msgSize
);
rpcMsg
.
contLen
=
msgSize
;
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"give a reply"
);
rpcMsg
.
handle
=
pRpcMsg
->
handle
;
rpcMsg
.
code
=
0
;
sTrace
(
"syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d"
,
pRpcMsg
->
msgType
,
rpcMsg
.
contLen
);
rpcSendResponse
(
&
rpcMsg
);
}
taosFreeQitem
(
pRpcMsg
);
}
}
taosFreeQall
(
qall
);
return
NULL
;
}
static
int
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
return
ret
;
}
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sTrace
(
"<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
(
char
*
)
pMsg
->
pCont
);
SSyncIO
*
io
=
pParent
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
}
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sTrace
(
"syncIOProcessReply: type:%d, contLen:%d msg:%s"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
(
char
*
)
pMsg
->
pCont
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
int32_t
syncIOTickQInternal
(
SSyncIO
*
io
)
{
io
->
ioTimerTickQ
=
taosTmrStart
(
syncIOTickQFunc
,
1000
,
io
,
io
->
ioTimerManager
);
return
0
;
}
static
void
syncIOTickQFunc
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sTrace
(
"<-- syncIOTickQFunc -->"
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOTickQ"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
55
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
&
rpcMsg
,
sizeof
(
SRpcMsg
));
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
taosTmrReset
(
syncIOTickQFunc
,
1000
,
io
,
io
->
ioTimerManager
,
&
io
->
ioTimerTickQ
);
}
static
int32_t
syncIOTickPingInternal
(
SSyncIO
*
io
)
{
io
->
ioTimerTickPing
=
taosTmrStart
(
syncIOTickPingFunc
,
1000
,
io
,
io
->
ioTimerManager
);
return
0
;
}
static
void
syncIOTickPingFunc
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sTrace
(
"<-- syncIOTickPingFunc -->"
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOTickPing"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
77
;
rpcSendRequest
(
io
->
clientRpc
,
&
io
->
myAddr
,
&
rpcMsg
,
NULL
);
taosTmrReset
(
syncIOTickPingFunc
,
1000
,
io
,
io
->
ioTimerManager
,
&
io
->
ioTimerTickPing
);
}
\ No newline at end of file
source/libs/sync/src/syncMain.c
浏览文件 @
06bbae7d
...
...
@@ -15,12 +15,36 @@
#include <stdint.h>
#include "sync.h"
#include "syncEnv.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncUtil.h"
int32_t
syncInit
()
{
return
0
;
}
static
int32_t
tsNodeRefId
=
-
1
;
void
syncCleanUp
()
{}
// ------ local funciton ---------
static
int32_t
syncNodeSendMsgById
(
const
SRaftId
*
destRaftId
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
static
int32_t
syncNodeSendMsgByInfo
(
const
SNodeInfo
*
nodeInfo
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
static
void
syncNodePingTimerCb
(
void
*
param
,
void
*
tmrId
);
static
int32_t
syncNodePing
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SyncPing
*
pMsg
);
static
int32_t
syncNodeRequestVote
(
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
);
static
int32_t
syncNodeAppendEntries
(
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
static
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
static
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
static
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
static
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
static
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
static
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
// ---------------------------------
int32_t
syncInit
()
{
sTrace
(
"syncInit ok"
);
return
0
;
}
void
syncCleanUp
()
{
sTrace
(
"syncCleanUp ok"
);
}
int64_t
syncStart
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
...
...
@@ -32,7 +56,9 @@ void syncStop(int64_t rid) {}
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pSyncCfg
)
{
return
0
;
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SSyncBuffer
*
pBuf
,
bool
isWeak
)
{
return
0
;
}
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SRpcMsg
*
pBuf
,
bool
isWeak
)
{
return
0
;
}
ESyncState
syncGetMyRole
(
int64_t
rid
)
{
return
TAOS_SYNC_STATE_LEADER
;
}
...
...
@@ -41,69 +67,238 @@ void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
malloc
(
sizeof
(
SSyncNode
));
assert
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
pSyncNode
->
vgId
=
pSyncInfo
->
vgId
;
pSyncNode
->
syncCfg
=
pSyncInfo
->
syncCfg
;
memcpy
(
pSyncNode
->
path
,
pSyncInfo
->
path
,
sizeof
(
pSyncNode
->
path
));
pSyncNode
->
pFsm
=
pSyncInfo
->
pFsm
;
pSyncNode
->
rpcClient
=
pSyncInfo
->
rpcClient
;
pSyncNode
->
FpSendMsg
=
pSyncInfo
->
FpSendMsg
;
pSyncNode
->
FpPing
=
doSyncNodePing
;
pSyncNode
->
FpOnPing
=
onSyncNodePing
;
pSyncNode
->
FpOnPingReply
=
onSyncNodePingReply
;
pSyncNode
->
FpRequestVote
=
doSyncNodeRequestVote
;
pSyncNode
->
FpOnRequestVote
=
onSyncNodeRequestVote
;
pSyncNode
->
FpOnRequestVoteReply
=
onSyncNodeRequestVoteReply
;
pSyncNode
->
FpAppendEntries
=
doSyncNodeAppendEntries
;
pSyncNode
->
FpOnAppendEntries
=
onSyncNodeAppendEntries
;
pSyncNode
->
FpOnAppendEntriesReply
=
onSyncNodeAppendEntriesReply
;
pSyncNode
->
me
=
pSyncInfo
->
syncCfg
.
nodeInfo
[
pSyncInfo
->
syncCfg
.
myIndex
];
pSyncNode
->
peersNum
=
pSyncInfo
->
syncCfg
.
replicaNum
-
1
;
int
j
=
0
;
for
(
int
i
=
0
;
i
<
pSyncInfo
->
syncCfg
.
replicaNum
;
++
i
)
{
if
(
i
!=
pSyncInfo
->
syncCfg
.
myIndex
)
{
pSyncNode
->
peers
[
j
]
=
pSyncInfo
->
syncCfg
.
nodeInfo
[
i
];
j
++
;
}
}
pSyncNode
->
role
=
TAOS_SYNC_STATE_FOLLOWER
;
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
me
,
pSyncNode
->
vgId
,
&
pSyncNode
->
raftId
);
pSyncNode
->
pPingTimer
=
NULL
;
pSyncNode
->
pingTimerMS
=
1000
;
atomic_store_8
(
&
pSyncNode
->
pingTimerStart
,
0
);
pSyncNode
->
FpPingTimer
=
syncNodePingTimerCb
;
pSyncNode
->
pingTimerCounter
=
0
;
pSyncNode
->
FpOnPing
=
syncNodeOnPingCb
;
pSyncNode
->
FpOnPingReply
=
syncNodeOnPingReplyCb
;
pSyncNode
->
FpOnRequestVote
=
syncNodeOnRequestVoteCb
;
pSyncNode
->
FpOnRequestVoteReply
=
syncNodeOnRequestVoteReplyCb
;
pSyncNode
->
FpOnAppendEntries
=
syncNodeOnAppendEntriesCb
;
pSyncNode
->
FpOnAppendEntriesReply
=
syncNodeOnAppendEntriesReplyCb
;
return
pSyncNode
;
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
!=
NULL
);
raftClose
(
pSyncNode
->
pRaft
);
free
(
pSyncNode
);
}
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpPing
(
ths
->
pRaft
,
pMsg
);
void
syncNodePingAll
(
SSyncNode
*
pSyncNode
)
{
sTrace
(
"syncNodePingAll pSyncNode:%p "
,
pSyncNode
);
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
syncCfg
.
replicaNum
;
++
i
)
{
SRaftId
destId
;
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
syncCfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
destId
);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
raftId
,
&
destId
);
ret
=
syncNodePing
(
pSyncNode
,
&
destId
,
pMsg
);
assert
(
ret
==
0
);
syncPingDestroy
(
pMsg
);
}
}
void
syncNodePingPeers
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
destId
;
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peers
[
i
],
pSyncNode
->
vgId
,
&
destId
);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
raftId
,
&
destId
);
ret
=
syncNodePing
(
pSyncNode
,
&
destId
,
pMsg
);
assert
(
ret
==
0
);
syncPingDestroy
(
pMsg
);
}
}
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
;
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
raftId
,
&
pSyncNode
->
raftId
);
ret
=
syncNodePing
(
pSyncNode
,
&
pMsg
->
destId
,
pMsg
);
assert
(
ret
==
0
);
syncPingDestroy
(
pMsg
);
}
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
pPingTimer
==
NULL
)
{
pSyncNode
->
pPingTimer
=
taosTmrStart
(
pSyncNode
->
FpPingTimer
,
pSyncNode
->
pingTimerCounter
,
pSyncNode
,
gSyncEnv
->
pTimerManager
);
}
else
{
taosTmrReset
(
pSyncNode
->
FpPingTimer
,
pSyncNode
->
pingTimerCounter
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
}
atomic_store_8
(
&
pSyncNode
->
pingTimerStart
,
1
);
return
0
;
}
int32_t
syncNodeStopPingTimer
(
SSyncNode
*
pSyncNode
)
{
atomic_store_8
(
&
pSyncNode
->
pingTimerStart
,
0
);
pSyncNode
->
pingTimerCounter
=
TIMER_MAX_MS
;
return
0
;
}
// ------ local funciton ---------
static
int32_t
syncNodePing
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SyncPing
*
pMsg
)
{
sTrace
(
"syncNodePing pSyncNode:%p "
,
pSyncNode
);
int32_t
ret
=
0
;
SRpcMsg
rpcMsg
;
syncPing2RpcMsg
(
pMsg
,
&
rpcMsg
);
/*
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "xxxxxxxxxxxxxx");
rpcMsg.handle = NULL;
rpcMsg.msgType = 1;
*/
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
&
rpcMsg
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
sTrace
(
"syncNodePing pMsg:%s "
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
{
SyncPing
*
pMsg2
=
rpcMsg
.
pCont
;
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
sTrace
(
"syncNodePing rpcMsg.pCont:%s "
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
return
ret
;
}
static
int32_t
onSyncNodePing
(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnPing
(
ths
->
pRaft
,
pMsg
)
;
static
int32_t
syncNodeRequestVote
(
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
int32_t
onSyncNodePingReply
(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnPingReply
(
ths
->
pRaft
,
pMsg
)
;
static
int32_t
syncNodeAppendEntries
(
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
int32_t
doSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpRequestVote
(
ths
->
pRaft
,
pMsg
);
static
int32_t
syncNodeSendMsgById
(
const
SRaftId
*
destRaftId
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
)
{
sTrace
(
"syncNodeSendMsgById pSyncNode:%p "
,
pSyncNode
);
SEpSet
epSet
;
syncUtilraftId2EpSet
(
destRaftId
,
&
epSet
);
pSyncNode
->
FpSendMsg
(
pSyncNode
->
rpcClient
,
&
epSet
,
pMsg
);
return
0
;
}
static
int32_t
syncNodeSendMsgByInfo
(
const
SNodeInfo
*
nodeInfo
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
)
{
SEpSet
epSet
;
syncUtilnodeInfo2EpSet
(
nodeInfo
,
&
epSet
);
pSyncNode
->
FpSendMsg
(
pSyncNode
->
rpcClient
,
&
epSet
,
pMsg
);
return
0
;
}
static
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
)
{
int32_t
ret
=
0
;
sTrace
(
"<-- syncNodeOnPingCb -->"
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
sTrace
(
"syncNodeOnPingCb syncNodePing pMsg:%s "
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SyncPingReply
*
pMsgReply
=
syncPingReplyBuild3
(
&
ths
->
raftId
,
&
pMsg
->
srcId
);
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pMsgReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pMsgReply
->
destId
,
ths
,
&
rpcMsg
);
return
ret
;
}
static
int32_t
onSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnRequestVote
(
ths
->
pRaft
,
pMsg
);
static
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
)
{
int32_t
ret
=
0
;
sTrace
(
"<-- syncNodeOnPingReplyCb -->"
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
sTrace
(
"syncNodeOnPingReplyCb syncNodePing pMsg:%s "
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
return
ret
;
}
static
int32_t
onSyncNodeRequestVoteReply
(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnRequestVoteReply
(
ths
->
pRaft
,
pMsg
)
;
static
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpAppendEntries
(
ths
->
pRaft
,
pMsg
)
;
static
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnAppendEntries
(
ths
->
pRaft
,
pMsg
)
;
static
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnAppendEntriesReply
(
ths
->
pRaft
,
pMsg
)
;
static
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
void
syncNodePingTimerCb
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
if
(
atomic_load_8
(
&
pSyncNode
->
pingTimerStart
))
{
++
(
pSyncNode
->
pingTimerCounter
);
// pSyncNode->pingTimerMS += 100;
sTrace
(
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
"tmrId:%p "
,
pSyncNode
->
pingTimerCounter
,
pSyncNode
->
pingTimerMS
,
pSyncNode
->
pPingTimer
,
tmrId
);
syncNodePingAll
(
pSyncNode
);
taosTmrReset
(
syncNodePingTimerCb
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
&
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
}
else
{
sTrace
(
"syncNodePingTimerCb: pingTimerStart:%u "
,
pSyncNode
->
pingTimerStart
);
}
}
\ No newline at end of file
source/libs/sync/src/syncMessage.c
浏览文件 @
06bbae7d
...
...
@@ -15,5 +15,265 @@
#include "syncMessage.h"
#include "syncRaft.h"
#include "syncUtil.h"
#include "tcoding.h"
void
onMessage
(
SRaft
*
pRaft
,
void
*
pMsg
)
{}
\ No newline at end of file
void
onMessage
(
SRaft
*
pRaft
,
void
*
pMsg
)
{}
// ---- message process SyncPing----
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
SYNC_PING_FIX_LEN
+
dataLen
;
SyncPing
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_PING
;
pMsg
->
dataLen
=
dataLen
;
}
void
syncPingDestroy
(
SyncPing
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
assert
(
pMsg
->
bytes
==
SYNC_PING_FIX_LEN
+
pMsg
->
dataLen
);
}
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncPingSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
)
{
syncPingDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
)
{
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
pMsg
->
data
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPing"
,
pRoot
);
return
pJson
;
}
SyncPing
*
syncPingBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
)
{
uint32_t
dataLen
=
strlen
(
str
)
+
1
;
SyncPing
*
pMsg
=
syncPingBuild
(
dataLen
);
pMsg
->
srcId
=
*
srcId
;
pMsg
->
destId
=
*
destId
;
snprintf
(
pMsg
->
data
,
pMsg
->
dataLen
,
"%s"
,
str
);
return
pMsg
;
}
SyncPing
*
syncPingBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
)
{
SyncPing
*
pMsg
=
syncPingBuild2
(
srcId
,
destId
,
"ping"
);
return
pMsg
;
}
// ---- message process SyncPingReply----
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
SYNC_PING_REPLY_FIX_LEN
+
dataLen
;
SyncPingReply
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_PING_REPLY
;
pMsg
->
dataLen
=
dataLen
;
}
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
assert
(
pMsg
->
bytes
==
SYNC_PING_FIX_LEN
+
pMsg
->
dataLen
);
}
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncPingReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
)
{
syncPingReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
pMsg
->
data
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPingReply"
,
pRoot
);
return
pJson
;
}
SyncPingReply
*
syncPingReplyBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
)
{
uint32_t
dataLen
=
strlen
(
str
)
+
1
;
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
dataLen
);
pMsg
->
srcId
=
*
srcId
;
pMsg
->
destId
=
*
destId
;
snprintf
(
pMsg
->
data
,
pMsg
->
dataLen
,
"%s"
,
str
);
return
pMsg
;
}
SyncPingReply
*
syncPingReplyBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
)
{
SyncPingReply
*
pMsg
=
syncPingReplyBuild2
(
srcId
,
destId
,
"pang"
);
return
pMsg
;
}
#if 0
void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) {
*bufLen = sizeof(SyncPing) + pMsg->dataLen;
*ppBuf = (char*)malloc(*bufLen);
void* pStart = *ppBuf;
uint32_t allBytes = *bufLen;
int len = 0;
len = taosEncodeFixedU32(&pStart, pMsg->msgType);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pMsg->destId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
len = taosEncodeFixedU32(&pStart, pMsg->dataLen);
allBytes -= len;
assert(len > 0);
pStart += len;
memcpy(pStart, pMsg->data, pMsg->dataLen);
allBytes -= pMsg->dataLen;
assert(allBytes == 0);
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
void* pStart = (void*)buf;
uint64_t u64;
int32_t i32;
uint32_t u32;
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->msgType = u64;
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->srcId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pMsg->srcId.vgId = i32;
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->destId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pMsg->destId.vgId = i32;
pStart = taosDecodeFixedU32(pStart, &u32);
pMsg->dataLen = u32;
}
#endif
\ No newline at end of file
source/libs/sync/src/syncRaft.c
浏览文件 @
06bbae7d
...
...
@@ -16,6 +16,8 @@
#include "syncRaft.h"
#include "sync.h"
#if 0
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL);
...
...
@@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
#endif
\ No newline at end of file
source/libs/sync/src/syncRaftStore.c
浏览文件 @
06bbae7d
...
...
@@ -18,23 +18,125 @@
// to complie success: FileIO interface is modified
SRaftStore
*
raftStoreOpen
(
const
char
*
path
)
{
return
NULL
;}
SRaftStore
*
raftStoreOpen
(
const
char
*
path
)
{
int32_t
ret
;
SRaftStore
*
pRaftStore
=
malloc
(
sizeof
(
SRaftStore
));
if
(
pRaftStore
==
NULL
)
{
sError
(
"raftStoreOpen malloc error"
);
return
NULL
;
}
memset
(
pRaftStore
,
0
,
sizeof
(
*
pRaftStore
));
snprintf
(
pRaftStore
->
path
,
sizeof
(
pRaftStore
->
path
),
"%s"
,
path
);
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
];
memset
(
storeBuf
,
0
,
sizeof
(
storeBuf
));
if
(
!
raftStoreFileExist
(
pRaftStore
->
path
))
{
ret
=
raftStoreInit
(
pRaftStore
);
assert
(
ret
==
0
);
}
pRaftStore
->
pFile
=
taosOpenFile
(
path
,
TD_FILE_READ
|
TD_FILE_WRITE
);
assert
(
pRaftStore
->
pFile
!=
NULL
);
int
len
=
taosReadFile
(
pRaftStore
->
pFile
,
storeBuf
,
RAFT_STORE_BLOCK_SIZE
);
assert
(
len
==
RAFT_STORE_BLOCK_SIZE
);
ret
=
raftStoreDeserialize
(
pRaftStore
,
storeBuf
,
len
);
assert
(
ret
==
0
);
return
pRaftStore
;
}
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
)
{
assert
(
pRaftStore
!=
NULL
);
pRaftStore
->
pFile
=
taosOpenFile
(
pRaftStore
->
path
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
);
assert
(
pRaftStore
->
pFile
!=
NULL
);
pRaftStore
->
currentTerm
=
0
;
pRaftStore
->
voteFor
.
addr
=
0
;
pRaftStore
->
voteFor
.
vgId
=
0
;
int32_t
ret
=
raftStorePersist
(
pRaftStore
);
assert
(
ret
==
0
);
taosCloseFile
(
&
pRaftStore
->
pFile
);
return
0
;
}
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
assert
(
pRaftStore
!=
NULL
);
taosCloseFile
(
&
pRaftStore
->
pFile
);
free
(
pRaftStore
);
pRaftStore
=
NULL
;
return
0
;
}
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
)
{
assert
(
pRaftStore
!=
NULL
);
int32_t
ret
;
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
];
ret
=
raftStoreSerialize
(
pRaftStore
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
ret
==
0
);
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
)
{
return
0
;}
taosLSeekFile
(
pRaftStore
->
pFile
,
0
,
SEEK_SET
);
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
return
0
;}
ret
=
taosWriteFile
(
pRaftStore
->
pFile
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
ret
==
RAFT_STORE_BLOCK_SIZE
);
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
)
{
return
0
;}
taosFsyncFile
(
pRaftStore
->
pFile
);
return
0
;
}
static
bool
raftStoreFileExist
(
char
*
path
)
{
return
0
;
}
static
bool
raftStoreFileExist
(
char
*
path
)
{
return
taosStatFile
(
path
,
NULL
,
NULL
)
>=
0
;
}
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
return
0
;}
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
assert
(
pRaftStore
!=
NULL
);
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
return
0
;}
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"current_term"
,
pRaftStore
->
currentTerm
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_for_addr"
,
pRaftStore
->
voteFor
.
addr
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_for_vgid"
,
pRaftStore
->
voteFor
.
vgId
);
void
raftStorePrint
(
SRaftStore
*
pRaftStore
)
{}
char
*
serialized
=
cJSON_Print
(
pRoot
);
int
len2
=
strlen
(
serialized
);
assert
(
len2
<
len
);
memset
(
buf
,
0
,
len
);
snprintf
(
buf
,
len
,
"%s"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pRoot
);
return
0
;
}
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
assert
(
pRaftStore
!=
NULL
);
assert
(
len
>
0
&&
len
<=
RAFT_STORE_BLOCK_SIZE
);
cJSON
*
pRoot
=
cJSON_Parse
(
buf
);
cJSON
*
pCurrentTerm
=
cJSON_GetObjectItem
(
pRoot
,
"current_term"
);
pRaftStore
->
currentTerm
=
pCurrentTerm
->
valueint
;
cJSON
*
pVoteForAddr
=
cJSON_GetObjectItem
(
pRoot
,
"vote_for_addr"
);
pRaftStore
->
voteFor
.
addr
=
pVoteForAddr
->
valueint
;
cJSON
*
pVoteForVgid
=
cJSON_GetObjectItem
(
pRoot
,
"vote_for_vgid"
);
pRaftStore
->
voteFor
.
vgId
=
pVoteForVgid
->
valueint
;
cJSON_Delete
(
pRoot
);
return
0
;
}
void
raftStorePrint
(
SRaftStore
*
pRaftStore
)
{
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
];
raftStoreSerialize
(
pRaftStore
,
storeBuf
,
sizeof
(
storeBuf
));
printf
(
"%s
\n
"
,
storeBuf
);
}
#if 0
...
...
source/libs/sync/src/syncUtil.c
0 → 100644
浏览文件 @
06bbae7d
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncUtil.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
// ---- encode / decode
uint64_t
syncUtilAddr2U64
(
const
char
*
host
,
uint16_t
port
)
{
uint64_t
u64
;
uint32_t
hostU32
=
(
uint32_t
)
inet_addr
(
host
);
// assert(hostU32 != (uint32_t)-1);
u64
=
(((
uint64_t
)
hostU32
)
<<
32
)
|
(((
uint32_t
)
port
)
<<
16
);
return
u64
;
}
void
syncUtilU642Addr
(
uint64_t
u64
,
char
*
host
,
size_t
len
,
uint16_t
*
port
)
{
uint32_t
hostU32
=
(
uint32_t
)((
u64
>>
32
)
&
0x00000000FFFFFFFF
);
struct
in_addr
addr
;
addr
.
s_addr
=
hostU32
;
snprintf
(
host
,
len
,
"%s"
,
inet_ntoa
(
addr
));
*
port
=
(
uint16_t
)((
u64
&
0x00000000FFFF0000
)
>>
16
);
}
void
syncUtilnodeInfo2EpSet
(
const
SNodeInfo
*
pNodeInfo
,
SEpSet
*
pEpSet
)
{
pEpSet
->
inUse
=
0
;
pEpSet
->
numOfEps
=
0
;
addEpIntoEpSet
(
pEpSet
,
pNodeInfo
->
nodeFqdn
,
pNodeInfo
->
nodePort
);
}
void
syncUtilraftId2EpSet
(
const
SRaftId
*
raftId
,
SEpSet
*
pEpSet
)
{
char
host
[
TSDB_FQDN_LEN
];
uint16_t
port
;
syncUtilU642Addr
(
raftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
/*
pEpSet->numOfEps = 1;
pEpSet->inUse = 0;
pEpSet->eps[0].port = port;
snprintf(pEpSet->eps[0].fqdn, sizeof(pEpSet->eps[0].fqdn), "%s", host);
*/
pEpSet
->
inUse
=
0
;
pEpSet
->
numOfEps
=
0
;
addEpIntoEpSet
(
pEpSet
,
host
,
port
);
}
void
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
)
{
uint32_t
ipv4
=
taosGetIpv4FromFqdn
(
pNodeInfo
->
nodeFqdn
);
assert
(
ipv4
!=
0xFFFFFFFF
);
char
ipbuf
[
128
];
tinet_ntoa
(
ipbuf
,
ipv4
);
raftId
->
addr
=
syncUtilAddr2U64
(
ipbuf
,
pNodeInfo
->
nodePort
);
raftId
->
vgId
=
vgId
;
}
// ---- SSyncBuffer -----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) {
syncBuf->len = len;
syncBuf->data = malloc(syncBuf->len);
}
void syncUtilbufDestroy(SSyncBuffer* syncBuf) { free(syncBuf->data); }
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->len = src->len;
dest->data = src->data;
}
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->len = src->len;
dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len);
}
#endif
\ No newline at end of file
source/libs/sync/test/CMakeLists.txt
浏览文件 @
06bbae7d
add_executable
(
syncTest
""
)
add_executable
(
syncEnvTest
""
)
add_executable
(
syncPingTest
""
)
add_executable
(
syncEncodeTest
""
)
add_executable
(
syncIOTickQTest
""
)
add_executable
(
syncIOTickPingTest
""
)
add_executable
(
syncIOSendMsgTest
""
)
add_executable
(
syncIOSendMsgClientTest
""
)
add_executable
(
syncIOSendMsgServerTest
""
)
add_executable
(
syncRaftStoreTest
""
)
target_sources
(
syncTest
...
...
@@ -15,6 +22,34 @@ target_sources(syncPingTest
PRIVATE
"syncPingTest.cpp"
)
target_sources
(
syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
)
target_sources
(
syncIOTickQTest
PRIVATE
"syncIOTickQTest.cpp"
)
target_sources
(
syncIOTickPingTest
PRIVATE
"syncIOTickPingTest.cpp"
)
target_sources
(
syncIOSendMsgTest
PRIVATE
"syncIOSendMsgTest.cpp"
)
target_sources
(
syncIOSendMsgClientTest
PRIVATE
"syncIOSendMsgClientTest.cpp"
)
target_sources
(
syncIOSendMsgServerTest
PRIVATE
"syncIOSendMsgServerTest.cpp"
)
target_sources
(
syncRaftStoreTest
PRIVATE
"syncRaftStoreTest.cpp"
)
target_include_directories
(
syncTest
...
...
@@ -32,6 +67,41 @@ target_include_directories(syncPingTest
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncEncodeTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIOTickQTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIOTickPingTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIOSendMsgTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIOSendMsgClientTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIOSendMsgServerTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncRaftStoreTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
syncTest
...
...
@@ -46,6 +116,34 @@ target_link_libraries(syncPingTest
sync
gtest_main
)
target_link_libraries
(
syncEncodeTest
sync
gtest_main
)
target_link_libraries
(
syncIOTickQTest
sync
gtest_main
)
target_link_libraries
(
syncIOTickPingTest
sync
gtest_main
)
target_link_libraries
(
syncIOSendMsgTest
sync
gtest_main
)
target_link_libraries
(
syncIOSendMsgClientTest
sync
gtest_main
)
target_link_libraries
(
syncIOSendMsgServerTest
sync
gtest_main
)
target_link_libraries
(
syncRaftStoreTest
sync
gtest_main
)
enable_testing
()
...
...
source/libs/sync/test/syncEncodeTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
#define PING_MSG_LEN 20
void
test1
()
{
sTrace
(
"test1: ---- syncPingSerialize, syncPingDeserialize"
);
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
1
;
pMsg
->
srcId
.
vgId
=
2
;
pMsg
->
destId
.
addr
=
3
;
pMsg
->
destId
.
vgId
=
4
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncPingSerialize
(
pMsg
,
buf
,
bufLen
);
SyncPing
*
pMsg2
=
(
SyncPing
*
)
malloc
(
pMsg
->
bytes
);
syncPingDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncPingDestroy
(
pMsg
);
syncPingDestroy
(
pMsg2
);
free
(
buf
);
}
void
test2
()
{
sTrace
(
"test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg"
);
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
100
;
pMsg
->
srcId
.
vgId
=
200
;
pMsg
->
destId
.
addr
=
300
;
pMsg
->
destId
.
vgId
=
400
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncPing2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPing
*
pMsg2
=
(
SyncPing
*
)
malloc
(
pMsg
->
bytes
);
syncPingFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncPingDestroy
(
pMsg
);
syncPingDestroy
(
pMsg2
);
}
void
test3
()
{
sTrace
(
"test3: ---- syncPingReplySerialize, syncPingReplyDeserialize"
);
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
19
;
pMsg
->
srcId
.
vgId
=
29
;
pMsg
->
destId
.
addr
=
39
;
pMsg
->
destId
.
vgId
=
49
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncPingReplySerialize
(
pMsg
,
buf
,
bufLen
);
SyncPingReply
*
pMsg2
=
(
SyncPingReply
*
)
malloc
(
pMsg
->
bytes
);
syncPingReplyDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg2
);
free
(
buf
);
}
void
test4
()
{
sTrace
(
"test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg"
);
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
66
;
pMsg
->
srcId
.
vgId
=
77
;
pMsg
->
destId
.
addr
=
88
;
pMsg
->
destId
.
vgId
=
99
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPingReply
*
pMsg2
=
(
SyncPingReply
*
)
malloc
(
pMsg
->
bytes
);
syncPingReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg2
);
}
int
main
()
{
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
test1
();
test2
();
test3
();
test4
();
return
0
;
}
source/libs/sync/test/syncEnvTest.cpp
浏览文件 @
06bbae7d
...
...
@@ -34,19 +34,20 @@ void doSync() {
}
int
main
()
{
//taosInitLog((char*)"syncEnvTest.log", 100000, 10);
//
taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
int32_t
ret
;
logTest
();
int32_t
ret
=
syncIOStart
();
assert
(
ret
==
0
);
//
ret = syncIOStart();
//
assert(ret == 0);
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
doSync
();
//
doSync();
while
(
1
)
{
taosMsleep
(
1000
);
...
...
source/libs/sync/test/syncIOSendMsgClientTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
int
main
()
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
;
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
SEpSet
epSet
;
epSet
.
inUse
=
0
;
epSet
.
numOfEps
=
0
;
addEpIntoEpSet
(
&
epSet
,
"127.0.0.1"
,
7030
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
((
char
*
)
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOSendMsgTest"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
77
;
syncIOSendMsg
(
gSyncIO
->
clientRpc
,
&
epSet
,
&
rpcMsg
);
sleep
(
1
);
}
while
(
1
)
{
sleep
(
1
);
}
return
0
;
}
source/libs/sync/test/syncIOSendMsgServerTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
int
main
()
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
;
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7030
);
assert
(
ret
==
0
);
while
(
1
)
{
sleep
(
1
);
}
return
0
;
}
source/libs/sync/test/syncIOSendMsgTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
int
main
()
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
;
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
SEpSet
epSet
;
epSet
.
inUse
=
0
;
epSet
.
numOfEps
=
0
;
addEpIntoEpSet
(
&
epSet
,
"127.0.0.1"
,
7010
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
((
char
*
)
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOSendMsgTest"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
77
;
syncIOSendMsg
(
gSyncIO
->
clientRpc
,
&
epSet
,
&
rpcMsg
);
sleep
(
1
);
}
while
(
1
)
{
sleep
(
1
);
}
return
0
;
}
source/libs/sync/test/syncIOTickPingTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
int
main
()
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
;
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
ret
=
syncIOTickPing
();
assert
(
ret
==
0
);
while
(
1
)
{
sleep
(
1
);
}
return
0
;
}
source/libs/sync/test/syncIOTickQTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
int
main
()
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
;
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
ret
=
syncIOTickQ
();
assert
(
ret
==
0
);
while
(
1
)
{
sleep
(
1
);
}
return
0
;
}
source/libs/sync/test/syncPingTest.cpp
浏览文件 @
06bbae7d
...
...
@@ -13,49 +13,71 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
void
doSync
()
{
SSyncNode
*
doSync
()
{
SSyncFSM
*
pFsm
;
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
1
;
syncInfo
.
rpcClient
=
gSyncIO
->
clientRpc
;
syncInfo
.
FpSendMsg
=
syncIOSendMsg
;
syncInfo
.
pFsm
=
pFsm
;
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s"
,
"./test_sync_ping"
);
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
0
;
pCfg
->
replicaNum
=
3
;
pCfg
->
replicaNum
=
2
;
pCfg
->
nodeInfo
[
0
].
nodePort
=
7010
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
);
snprintf
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg
->
nodeInfo
[
1
].
nodePort
=
7110
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
);
snprintf
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg
->
nodeInfo
[
2
].
nodePort
=
7210
;
taosGetFqdn
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
);
snprintf
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
&
syncInfo
);
assert
(
pSyncNode
!=
NULL
);
gSyncIO
->
FpOnPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOn
Sync
Ping
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
return
pSyncNode
;
}
void
timerPingAll
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
syncNodePingAll
(
pSyncNode
);
}
int
main
()
{
//taosInitLog((char*)"syncPingTest.log", 100000, 10);
//
taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
=
syncIOStart
();
int32_t
ret
=
syncIOStart
(
(
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
doSync
();
SSyncNode
*
pSyncNode
=
doSync
();
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
assert
(
ret
==
0
);
/*
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
while
(
1
)
{
taosMsleep
(
1000
);
...
...
source/libs/sync/test/syncRaftStoreTest.cpp
0 → 100644
浏览文件 @
06bbae7d
#include "syncRaftStore.h"
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
void
*
pingFunc
(
void
*
param
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
while
(
1
)
{
sDebug
(
"io->ping"
);
// io->ping(io);
sleep
(
1
);
}
return
NULL
;
}
int
main
()
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
sTrace
(
"sync log test: trace"
);
sDebug
(
"sync log test: debug"
);
sInfo
(
"sync log test: info"
);
sWarn
(
"sync log test: warn"
);
sError
(
"sync log test: error"
);
sFatal
(
"sync log test: fatal"
);
SRaftStore
*
pRaftStore
=
raftStoreOpen
(
"./raft_store.json"
);
assert
(
pRaftStore
!=
NULL
);
raftStorePrint
(
pRaftStore
);
pRaftStore
->
currentTerm
=
100
;
pRaftStore
->
voteFor
.
addr
=
200
;
pRaftStore
->
voteFor
.
vgId
=
300
;
raftStorePrint
(
pRaftStore
);
raftStorePersist
(
pRaftStore
);
return
0
;
}
source/libs/sync/test/syncTest.cpp
浏览文件 @
06bbae7d
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "gtest/gtest.h"
void
*
pingFunc
(
void
*
param
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
while
(
1
)
{
sDebug
(
"io->ping"
);
io
->
ping
(
io
);
//
io->ping(io);
sleep
(
1
);
}
return
NULL
;
}
int
main
()
{
//taosInitLog((char *)"syncTest.log", 100000, 10);
//
taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录