Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
02f0f85a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
02f0f85a
编写于
2月 28, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync modify timer
上级
42d91c3d
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
187 addition
and
84 deletion
+187
-84
include/libs/sync/sync.h
include/libs/sync/sync.h
+3
-1
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+6
-4
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+0
-6
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+20
-18
source/libs/sync/inc/syncRaft.h
source/libs/sync/inc/syncRaft.h
+4
-0
source/libs/sync/inc/syncRaftStore.h
source/libs/sync/inc/syncRaftStore.h
+2
-2
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+45
-2
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+42
-21
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+35
-12
source/libs/sync/src/syncRaft.c
source/libs/sync/src/syncRaft.c
+4
-0
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+7
-9
source/libs/sync/test/syncEnvTest.cpp
source/libs/sync/test/syncEnvTest.cpp
+5
-4
source/libs/sync/test/syncPingTest.cpp
source/libs/sync/test/syncPingTest.cpp
+12
-3
source/libs/sync/test/syncTest.cpp
source/libs/sync/test/syncTest.cpp
+2
-2
未找到文件。
include/libs/sync/sync.h
浏览文件 @
02f0f85a
...
@@ -34,7 +34,9 @@ typedef enum {
...
@@ -34,7 +34,9 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER
=
0
,
TAOS_SYNC_STATE_FOLLOWER
=
0
,
TAOS_SYNC_STATE_CANDIDATE
=
1
,
TAOS_SYNC_STATE_CANDIDATE
=
1
,
TAOS_SYNC_STATE_LEADER
=
2
,
TAOS_SYNC_STATE_LEADER
=
2
,
}
ESyncState
;
}
ESyncRole
;
typedef
ESyncRole
ESyncState
;
typedef
struct
SSyncBuffer
{
typedef
struct
SSyncBuffer
{
void
*
data
;
void
*
data
;
...
...
source/libs/sync/inc/syncEnv.h
浏览文件 @
02f0f85a
...
@@ -26,19 +26,21 @@ extern "C" {
...
@@ -26,19 +26,21 @@ extern "C" {
#include "syncInt.h"
#include "syncInt.h"
#include "taosdef.h"
#include "taosdef.h"
#include "trpc.h"
#include "trpc.h"
#include "ttimer.h"
typedef
struct
SSyncEnv
{
typedef
struct
SSyncEnv
{
void
*
pTimer
;
tmr_h
pEnvTickTimer
;
void
*
pTimerManager
;
tmr_h
pTimerManager
;
char
name
[
128
];
}
SSyncEnv
;
}
SSyncEnv
;
int32_t
syncEnvStart
();
int32_t
syncEnvStart
();
int32_t
syncEnvStop
();
int32_t
syncEnvStop
();
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
);
tmr_h
syncEnvStartTimer
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
void
syncEnvStopTimer
(
tmr_h
*
pTimer
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/sync/inc/syncIO.h
浏览文件 @
02f0f85a
...
@@ -61,12 +61,6 @@ int32_t syncIOStop();
...
@@ -61,12 +61,6 @@ int32_t syncIOStop();
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
SSyncIO
*
syncIOCreate
();
SSyncIO
*
syncIOCreate
();
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
);
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
);
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
);
static
int32_t
doSyncIOOnMsg
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
02f0f85a
...
@@ -26,6 +26,7 @@ extern "C" {
...
@@ -26,6 +26,7 @@ extern "C" {
#include "sync.h"
#include "sync.h"
#include "taosdef.h"
#include "taosdef.h"
#include "tlog.h"
#include "tlog.h"
#include "ttimer.h"
extern
int32_t
sDebugFlag
;
extern
int32_t
sDebugFlag
;
...
@@ -87,15 +88,28 @@ typedef struct SyncAppendEntries SyncAppendEntries;
...
@@ -87,15 +88,28 @@ typedef struct SyncAppendEntries SyncAppendEntries;
struct
SyncAppendEntriesReply
;
struct
SyncAppendEntriesReply
;
typedef
struct
SyncAppendEntriesReply
SyncAppendEntriesReply
;
typedef
struct
SyncAppendEntriesReply
SyncAppendEntriesReply
;
typedef
struct
SRaftId
{
SyncNodeId
addr
;
SyncGroupId
vgId
;
}
SRaftId
;
typedef
struct
SSyncNode
{
typedef
struct
SSyncNode
{
int8_t
replica
;
int8_t
replica
;
int8_t
quorum
;
int8_t
quorum
;
int32_t
refCount
;
int64_t
rid
;
SyncGroupId
vgId
;
SyncGroupId
vgId
;
SSyncCfg
syncCfg
;
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
char
path
[
TSDB_FILENAME_LEN
];
SRaft
*
pRaft
;
ESyncRole
role
;
SRaftId
raftId
;
SSyncFSM
*
pFsm
;
tmr_h
pPingTimer
;
tmr_h
pElectionTimer
;
tmr_h
pHeartbeatTimer
;
int32_t
(
*
FpPing
)(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
int32_t
(
*
FpPing
)(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
...
@@ -123,23 +137,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
...
@@ -123,23 +137,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
void
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
static
int32_t
onSyncNodePing
(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
static
int32_t
onSyncNodePingReply
(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
static
int32_t
doSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
);
static
int32_t
onSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
static
int32_t
onSyncNodeRequestVoteReply
(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
void
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/sync/inc/syncRaft.h
浏览文件 @
02f0f85a
...
@@ -27,6 +27,8 @@ extern "C" {
...
@@ -27,6 +27,8 @@ extern "C" {
#include "syncMessage.h"
#include "syncMessage.h"
#include "taosdef.h"
#include "taosdef.h"
#if 0
typedef struct SRaftId {
typedef struct SRaftId {
SyncNodeId addr;
SyncNodeId addr;
SyncGroupId vgId;
SyncGroupId vgId;
...
@@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
...
@@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
#endif
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/sync/inc/syncRaftStore.h
浏览文件 @
02f0f85a
...
@@ -34,8 +34,8 @@ extern "C" {
...
@@ -34,8 +34,8 @@ extern "C" {
typedef
struct
SRaftStore
{
typedef
struct
SRaftStore
{
SyncTerm
currentTerm
;
SyncTerm
currentTerm
;
SRaftId
voteFor
;
SRaftId
voteFor
;
//FileFd fd;
//
FileFd fd;
char
path
[
RAFT_STORE_PATH_LEN
];
char
path
[
RAFT_STORE_PATH_LEN
];
}
SRaftStore
;
}
SRaftStore
;
SRaftStore
*
raftStoreOpen
(
const
char
*
path
);
SRaftStore
*
raftStoreOpen
(
const
char
*
path
);
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
02f0f85a
...
@@ -18,6 +18,14 @@
...
@@ -18,6 +18,14 @@
SSyncEnv
*
gSyncEnv
=
NULL
;
SSyncEnv
*
gSyncEnv
=
NULL
;
// local function -----------------
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
);
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
static
tmr_h
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
);
static
void
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
,
tmr_h
*
pTimer
);
// --------------------------------
int32_t
syncEnvStart
()
{
int32_t
syncEnvStart
()
{
int32_t
ret
;
int32_t
ret
;
gSyncEnv
=
(
SSyncEnv
*
)
malloc
(
sizeof
(
SSyncEnv
));
gSyncEnv
=
(
SSyncEnv
*
)
malloc
(
sizeof
(
SSyncEnv
));
...
@@ -31,6 +39,41 @@ int32_t syncEnvStop() {
...
@@ -31,6 +39,41 @@ int32_t syncEnvStop() {
return
ret
;
return
ret
;
}
}
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
)
{
return
0
;
}
tmr_h
syncEnvStartTimer
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
)
{
return
doSyncEnvStartTimer
(
gSyncEnv
,
fp
,
mseconds
,
param
);
}
void
syncEnvStopTimer
(
tmr_h
*
pTimer
)
{
doSyncEnvStopTimer
(
gSyncEnv
,
pTimer
);
}
// local function -----------------
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
)
{
SSyncEnv
*
pSyncEnv
=
(
SSyncEnv
*
)
param
;
sTrace
(
"syncEnvTick ... name:%s "
,
pSyncEnv
->
name
);
pSyncEnv
->
pEnvTickTimer
=
taosTmrStart
(
syncEnvTick
,
1000
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
}
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
)
{
snprintf
(
pSyncEnv
->
name
,
sizeof
(
pSyncEnv
->
name
),
"SyncEnv_%p"
,
pSyncEnv
);
// start tmr thread
pSyncEnv
->
pTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC-ENV"
);
pSyncEnv
->
pEnvTickTimer
=
taosTmrStart
(
syncEnvTick
,
1000
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
sTrace
(
"SyncEnv start ok, name:%s"
,
pSyncEnv
->
name
);
return
0
;
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
taosTmrCleanUp
(
pSyncEnv
->
pTimerManager
);
return
0
;
}
static
tmr_h
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
)
{
return
taosTmrStart
(
fp
,
mseconds
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
return
0
;
}
static
void
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
,
tmr_h
*
pTimer
)
{}
// --------------------------------
\ No newline at end of file
source/libs/sync/src/syncIO.c
浏览文件 @
02f0f85a
...
@@ -22,12 +22,49 @@
...
@@ -22,12 +22,49 @@
SSyncIO
*
gSyncIO
=
NULL
;
SSyncIO
*
gSyncIO
=
NULL
;
// local function ------------
static
void
*
syncConsumer
(
void
*
param
);
static
int
retrieveAuthInfo
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncTick
(
void
*
param
,
void
*
tmrId
);
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
);
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
);
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
);
static
int32_t
doSyncIOOnMsg
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
);
// ----------------------------
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
return
0
;
}
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
return
0
;
}
int32_t
syncIOStart
()
{
return
0
;
}
int32_t
syncIOStart
()
{
gSyncIO
=
syncIOCreate
();
assert
(
gSyncIO
!=
NULL
);
return
0
;
}
int32_t
syncIOStop
()
{
return
0
;
}
int32_t
syncIOStop
()
{
return
0
;
}
SSyncIO
*
syncIOCreate
()
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
start
=
doSyncIOStart
;
io
->
stop
=
doSyncIOStop
;
io
->
ping
=
doSyncIOPing
;
io
->
onMsg
=
doSyncIOOnMsg
;
io
->
destroy
=
doSyncIODestroy
;
return
io
;
}
// local function ------------
static
void
syncTick
(
void
*
param
,
void
*
tmrId
)
{
static
void
syncTick
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sDebug
(
"syncTick ... "
);
sDebug
(
"syncTick ... "
);
...
@@ -46,14 +83,15 @@ static void syncTick(void *param, void *tmrId) {
...
@@ -46,14 +83,15 @@ static void syncTick(void *param, void *tmrId) {
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
io
->
syncTimer
=
taosTmrStart
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
);
bool
b
=
taosTmrReset
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
,
io
->
syncTimer
);
assert
(
b
);
}
}
void
*
syncConsumer
(
void
*
param
)
{
static
void
*
syncConsumer
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
int
type
;
qall
=
taosAllocateQall
();
qall
=
taosAllocateQall
();
...
@@ -114,23 +152,6 @@ static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
...
@@ -114,23 +152,6 @@ static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
}
}
SSyncIO
*
syncIOCreate
()
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
start
=
doSyncIOStart
;
io
->
stop
=
doSyncIOStop
;
io
->
ping
=
doSyncIOPing
;
io
->
onMsg
=
doSyncIOOnMsg
;
io
->
destroy
=
doSyncIODestroy
;
return
io
;
}
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
)
{
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
)
{
taosBlockSIGPIPE
();
taosBlockSIGPIPE
();
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
02f0f85a
...
@@ -18,9 +18,26 @@
...
@@ -18,9 +18,26 @@
#include "syncInt.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncRaft.h"
int32_t
syncInit
()
{
return
0
;
}
static
int32_t
tsNodeRefId
=
-
1
;
// ------ local funciton ---------
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
static
int32_t
onSyncNodePing
(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
static
int32_t
onSyncNodePingReply
(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
static
int32_t
doSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
);
static
int32_t
onSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
static
int32_t
onSyncNodeRequestVoteReply
(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
// ---------------------------------
int32_t
syncInit
()
{
sTrace
(
"syncInit ok"
);
return
0
;
}
void
syncCleanUp
()
{}
void
syncCleanUp
()
{
sTrace
(
"syncCleanUp ok"
);
}
int64_t
syncStart
(
const
SSyncInfo
*
pSyncInfo
)
{
int64_t
syncStart
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
...
@@ -59,51 +76,57 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
...
@@ -59,51 +76,57 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
!=
NULL
);
assert
(
pSyncNode
!=
NULL
);
raftClose
(
pSyncNode
->
pRaft
);
free
(
pSyncNode
);
free
(
pSyncNode
);
}
}
void
syncNodePingAll
(
SSyncNode
*
pSyncNode
)
{
sTrace
(
"syncNodePingAll %p "
,
pSyncNode
);
}
void
syncNodePingPeers
(
SSyncNode
*
pSyncNode
)
{}
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
)
{}
// ------ local funciton ---------
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
)
{
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpPing
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
onSyncNodePing
(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
)
{
static
int32_t
onSyncNodePing
(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnPing
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
onSyncNodePingReply
(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
)
{
static
int32_t
onSyncNodePingReply
(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnPingReply
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
doSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
)
{
static
int32_t
doSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpRequestVote
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
onSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{
static
int32_t
onSyncNodeRequestVote
(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnRequestVote
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
onSyncNodeRequestVoteReply
(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{
static
int32_t
onSyncNodeRequestVoteReply
(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnRequestVoteReply
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
)
{
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpAppendEntries
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnAppendEntries
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
ths
->
pRaft
->
FpOnAppendEntriesReply
(
ths
->
pRaft
,
pMsg
)
;
int32_t
ret
=
0
;
return
ret
;
return
ret
;
}
}
\ No newline at end of file
source/libs/sync/src/syncRaft.c
浏览文件 @
02f0f85a
...
@@ -16,6 +16,8 @@
...
@@ -16,6 +16,8 @@
#include "syncRaft.h"
#include "syncRaft.h"
#include "sync.h"
#include "sync.h"
#if 0
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL);
assert(pRaft != NULL);
...
@@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl
...
@@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
#endif
\ No newline at end of file
source/libs/sync/src/syncRaftStore.c
浏览文件 @
02f0f85a
...
@@ -18,24 +18,22 @@
...
@@ -18,24 +18,22 @@
// to complie success: FileIO interface is modified
// to complie success: FileIO interface is modified
SRaftStore
*
raftStoreOpen
(
const
char
*
path
)
{
return
NULL
;}
SRaftStore
*
raftStoreOpen
(
const
char
*
path
)
{
return
NULL
;
}
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
)
{
return
0
;}
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
)
{
return
0
;
}
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
return
0
;}
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
return
0
;
}
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
)
{
return
0
;}
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
)
{
return
0
;
}
static
bool
raftStoreFileExist
(
char
*
path
)
{
return
0
;}
static
bool
raftStoreFileExist
(
char
*
path
)
{
return
0
;
}
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
return
0
;}
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
return
0
;
}
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
return
0
;}
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
return
0
;
}
void
raftStorePrint
(
SRaftStore
*
pRaftStore
)
{}
void
raftStorePrint
(
SRaftStore
*
pRaftStore
)
{}
#if 0
#if 0
SRaftStore *raftStoreOpen(const char *path) {
SRaftStore *raftStoreOpen(const char *path) {
...
...
source/libs/sync/test/syncEnvTest.cpp
浏览文件 @
02f0f85a
...
@@ -34,19 +34,20 @@ void doSync() {
...
@@ -34,19 +34,20 @@ void doSync() {
}
}
int
main
()
{
int
main
()
{
//taosInitLog((char*)"syncEnvTest.log", 100000, 10);
//
taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
sDebugFlag
=
143
+
64
;
int32_t
ret
;
logTest
();
logTest
();
int32_t
ret
=
syncIOStart
();
//
ret = syncIOStart();
assert
(
ret
==
0
);
//
assert(ret == 0);
ret
=
syncEnvStart
();
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
assert
(
ret
==
0
);
doSync
();
//
doSync();
while
(
1
)
{
while
(
1
)
{
taosMsleep
(
1000
);
taosMsleep
(
1000
);
...
...
source/libs/sync/test/syncPingTest.cpp
浏览文件 @
02f0f85a
...
@@ -13,7 +13,7 @@ void logTest() {
...
@@ -13,7 +13,7 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
sFatal
(
"--- sync log test: fatal"
);
}
}
void
doSync
()
{
SSyncNode
*
doSync
()
{
SSyncFSM
*
pFsm
;
SSyncFSM
*
pFsm
;
SSyncInfo
syncInfo
;
SSyncInfo
syncInfo
;
...
@@ -40,10 +40,17 @@ void doSync() {
...
@@ -40,10 +40,17 @@ void doSync() {
gSyncIO
->
FpOnPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
return
pSyncNode
;
}
void
timerPingAll
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
syncNodePingAll
(
pSyncNode
);
}
}
int
main
()
{
int
main
()
{
//taosInitLog((char*)"syncPingTest.log", 100000, 10);
//
taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
sDebugFlag
=
143
+
64
;
...
@@ -55,7 +62,9 @@ int main() {
...
@@ -55,7 +62,9 @@ int main() {
ret
=
syncEnvStart
();
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
assert
(
ret
==
0
);
doSync
();
SSyncNode
*
pSyncNode
=
doSync
();
pSyncNode
->
pPingTimer
=
syncEnvStartTimer
(
timerPingAll
,
1000
,
pSyncNode
);
while
(
1
)
{
while
(
1
)
{
taosMsleep
(
1000
);
taosMsleep
(
1000
);
...
...
source/libs/sync/test/syncTest.cpp
浏览文件 @
02f0f85a
#include <stdio.h>
#include <stdio.h>
#include "gtest/gtest.h"
#include "syncIO.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncRaftStore.h"
#include "gtest/gtest.h"
void
*
pingFunc
(
void
*
param
)
{
void
*
pingFunc
(
void
*
param
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
...
@@ -15,7 +15,7 @@ void *pingFunc(void *param) {
...
@@ -15,7 +15,7 @@ void *pingFunc(void *param) {
}
}
int
main
()
{
int
main
()
{
//taosInitLog((char *)"syncTest.log", 100000, 10);
//
taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
sDebugFlag
=
143
+
64
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录