Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c9efe3de
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c9efe3de
编写于
2月 28, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync modify timer
上级
02f0f85a
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
133 addition
and
62 deletion
+133
-62
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+4
-0
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+6
-7
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+31
-10
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+1
-1
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+38
-37
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+44
-4
source/libs/sync/test/syncPingTest.cpp
source/libs/sync/test/syncPingTest.cpp
+9
-3
未找到文件。
source/libs/sync/inc/syncEnv.h
浏览文件 @
c9efe3de
...
...
@@ -28,12 +28,16 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
typedef
struct
SSyncEnv
{
tmr_h
pEnvTickTimer
;
tmr_h
pTimerManager
;
char
name
[
128
];
}
SSyncEnv
;
extern
SSyncEnv
*
gSyncEnv
;
int32_t
syncEnvStart
();
int32_t
syncEnvStop
();
...
...
source/libs/sync/inc/syncIO.h
浏览文件 @
c9efe3de
...
...
@@ -30,10 +30,10 @@ extern "C" {
#include "trpc.h"
typedef
struct
SSyncIO
{
void
*
serverRpc
;
void
*
clientRpc
;
void
*
serverRpc
;
void
*
clientRpc
;
STaosQueue
*
pMsgQ
;
STaosQset
*
pQset
;
STaosQset
*
pQset
;
pthread_t
tid
;
int8_t
isStart
;
...
...
@@ -56,10 +56,9 @@ typedef struct SSyncIO {
extern
SSyncIO
*
gSyncIO
;
int32_t
syncIOStart
();
int32_t
syncIOStop
();
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
SSyncIO
*
syncIOCreate
();
int32_t
syncIOStart
();
int32_t
syncIOStop
();
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
c9efe3de
...
...
@@ -88,6 +88,9 @@ typedef struct SyncAppendEntries SyncAppendEntries;
struct
SyncAppendEntriesReply
;
typedef
struct
SyncAppendEntriesReply
SyncAppendEntriesReply
;
struct
SSyncEnv
;
typedef
struct
SSyncEnv
SSyncEnv
;
typedef
struct
SRaftId
{
SyncNodeId
addr
;
SyncGroupId
vgId
;
...
...
@@ -103,32 +106,46 @@ typedef struct SSyncNode {
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
SNodeInfo
me
;
SNodeInfo
peers
[
TSDB_MAX_REPLICA
];
int32_t
peersNum
;
ESyncRole
role
;
SRaftId
raftId
;
SSyncFSM
*
pFsm
;
tmr_h
pPingTimer
;
tmr_h
pElectionTimer
;
tmr_h
pHeartbeatTimer
;
int32_t
(
*
FpPing
)(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
);
tmr_h
pPingTimer
;
int32_t
pingTimerMS
;
uint8_t
pingTimerStart
;
TAOS_TMR_CALLBACK
FpPingTimer
;
// Timer Fp
uint64_t
pingTimerCounter
;
tmr_h
pElectTimer
;
int32_t
electTimerMS
;
uint8_t
electTimerStart
;
TAOS_TMR_CALLBACK
FpElectTimer
;
// Timer Fp
uint64_t
electTimerCounter
;
tmr_h
pHeartbeatTimer
;
int32_t
heartbeatTimerMS
;
uint8_t
heartbeatTimerStart
;
TAOS_TMR_CALLBACK
FpHeartbeatTimer
;
// Timer Fp
uint64_t
heartbeatTimerCounter
;
// callback
int32_t
(
*
FpOnPing
)(
struct
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
(
*
FpOnPingReply
)(
struct
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
(
*
FpRequestVote
)(
struct
SSyncNode
*
ths
,
const
SyncRequestVote
*
pMsg
);
int32_t
(
*
FpOnRequestVote
)(
struct
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
int32_t
(
*
FpOnRequestVoteReply
)(
struct
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
int32_t
(
*
FpAppendEntries
)(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
int32_t
(
*
FpOnAppendEntries
)(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
(
*
FpOnAppendEntriesReply
)(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
// passed from outside
int32_t
(
*
FpSendMsg
)(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
}
SSyncNode
;
...
...
@@ -143,6 +160,10 @@ void syncNodePingPeers(SSyncNode* pSyncNode);
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStopPingTimer
(
SSyncNode
*
pSyncNode
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
c9efe3de
...
...
@@ -59,7 +59,7 @@ static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) {
// start tmr thread
pSyncEnv
->
pTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC-ENV"
);
pSyncEnv
->
pEnvTickTimer
=
taosTmrStart
(
syncEnvTick
,
1000
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
//
pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
sTrace
(
"SyncEnv start ok, name:%s"
,
pSyncEnv
->
name
);
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
c9efe3de
...
...
@@ -23,17 +23,18 @@
SSyncIO
*
gSyncIO
=
NULL
;
// local function ------------
static
void
*
syncConsumer
(
void
*
param
);
static
int
retrieveAuthInfo
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncTick
(
void
*
param
,
void
*
tmrId
);
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
);
static
int32_t
doSyncIOStop
(
SSyncIO
*
io
);
static
int32_t
doSyncIOPing
(
SSyncIO
*
io
);
static
int32_t
doSyncIOOnMsg
(
struct
SSyncIO
*
io
,
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
doSyncIODestroy
(
SSyncIO
*
io
);
static
SSyncIO
*
syncIOCreate
();
static
void
*
syncIOConsumer
(
void
*
param
);
static
int
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
syncIODoReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIODoRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIOTick
(
void
*
param
,
void
*
tmrId
);
// ----------------------------
int32_t
syncIOSendMsg
(
void
*
handle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
return
0
;
}
...
...
@@ -47,27 +48,10 @@ int32_t syncIOStart() {
int32_t
syncIOStop
()
{
return
0
;
}
SSyncIO
*
syncIOCreate
()
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
start
=
doSyncIOStart
;
io
->
stop
=
doSyncIOStop
;
io
->
ping
=
doSyncIOPing
;
io
->
onMsg
=
doSyncIOOnMsg
;
io
->
destroy
=
doSyncIODestroy
;
return
io
;
}
// local function ------------
static
void
syncTick
(
void
*
param
,
void
*
tmrId
)
{
static
void
sync
IO
Tick
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sDebug
(
"syncTick ... "
);
sDebug
(
"sync
IO
Tick ... "
);
SRpcMsg
rpcMsg
;
rpcMsg
.
pCont
=
rpcMallocCont
(
10
);
...
...
@@ -83,15 +67,15 @@ static void syncTick(void *param, void *tmrId) {
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
bool
b
=
taosTmrReset
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
,
io
->
syncTimer
);
bool
b
=
taosTmrReset
(
sync
IO
Tick
,
1000
,
io
,
io
->
syncTimerManager
,
io
->
syncTimer
);
assert
(
b
);
}
static
void
*
syncConsumer
(
void
*
param
)
{
static
void
*
sync
IO
Consumer
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
qall
=
taosAllocateQall
();
...
...
@@ -129,19 +113,19 @@ static void *syncConsumer(void *param) {
return
NULL
;
}
static
int
retrieveAuthInfo
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
static
int
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
return
ret
;
}
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sDebug
(
"
processResponse
... "
);
static
void
syncIODoReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sDebug
(
"
syncIODoReply
... "
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
static
void
syncIODoRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SSyncIO
*
io
=
pParent
;
SRpcMsg
*
pTemp
;
...
...
@@ -152,6 +136,23 @@ static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
}
static
SSyncIO
*
syncIOCreate
()
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
start
=
doSyncIOStart
;
io
->
stop
=
doSyncIOStop
;
io
->
ping
=
doSyncIOPing
;
io
->
onMsg
=
doSyncIOOnMsg
;
io
->
destroy
=
doSyncIODestroy
;
return
io
;
}
static
int32_t
doSyncIOStart
(
SSyncIO
*
io
)
{
taosBlockSIGPIPE
();
...
...
@@ -164,7 +165,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"SYNC-IO-CLIENT"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processResponse
;
rpcInit
.
cfp
=
syncIODoReply
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"sync-io"
;
...
...
@@ -187,10 +188,10 @@ static int32_t doSyncIOStart(SSyncIO *io) {
rpcInit
.
localPort
=
38000
;
rpcInit
.
label
=
"SYNC-IO-SERVER"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
cfp
=
syncIODoRequest
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
afp
=
retrieveAuthInfo
;
rpcInit
.
afp
=
syncIOAuth
;
rpcInit
.
parent
=
io
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
...
...
@@ -206,7 +207,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
// start consumer thread
{
if
(
pthread_create
(
&
io
->
tid
,
NULL
,
syncConsumer
,
io
)
!=
0
)
{
if
(
pthread_create
(
&
io
->
tid
,
NULL
,
sync
IO
Consumer
,
io
)
!=
0
)
{
sError
(
"failed to create sync consumer thread since %s"
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -215,7 +216,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
// start tmr thread
io
->
syncTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC"
);
io
->
syncTimer
=
taosTmrStart
(
syncTick
,
1000
,
io
,
io
->
syncTimerManager
);
io
->
syncTimer
=
taosTmrStart
(
sync
IO
Tick
,
1000
,
io
,
io
->
syncTimerManager
);
return
0
;
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
c9efe3de
...
...
@@ -15,6 +15,7 @@
#include <stdint.h>
#include "sync.h"
#include "syncEnv.h"
#include "syncInt.h"
#include "syncRaft.h"
...
...
@@ -30,6 +31,7 @@ static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVote
static
int32_t
doSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
const
SyncAppendEntries
*
pMsg
);
static
int32_t
onSyncNodeAppendEntries
(
struct
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
static
void
syncNodePingTimerCb
(
void
*
param
,
void
*
tmrId
);
// ---------------------------------
int32_t
syncInit
()
{
...
...
@@ -58,16 +60,19 @@ void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
malloc
(
sizeof
(
SSyncNode
));
assert
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
pSyncNode
->
FpSendMsg
=
pSyncInfo
->
FpSendMsg
;
pSyncNode
->
pPingTimer
=
NULL
;
pSyncNode
->
pingTimerMS
=
1000
;
atomic_store_8
(
&
pSyncNode
->
pingTimerStart
,
0
);
pSyncNode
->
FpPingTimer
=
syncNodePingTimerCb
;
pSyncNode
->
pingTimerCounter
=
0
;
pSyncNode
->
Fp
Ping
=
doSyncNodePin
g
;
pSyncNode
->
Fp
SendMsg
=
pSyncInfo
->
FpSendMs
g
;
pSyncNode
->
FpOnPing
=
onSyncNodePing
;
pSyncNode
->
FpOnPingReply
=
onSyncNodePingReply
;
pSyncNode
->
FpRequestVote
=
doSyncNodeRequestVote
;
pSyncNode
->
FpOnRequestVote
=
onSyncNodeRequestVote
;
pSyncNode
->
FpOnRequestVoteReply
=
onSyncNodeRequestVoteReply
;
pSyncNode
->
FpAppendEntries
=
doSyncNodeAppendEntries
;
pSyncNode
->
FpOnAppendEntries
=
onSyncNodeAppendEntries
;
pSyncNode
->
FpOnAppendEntriesReply
=
onSyncNodeAppendEntriesReply
;
...
...
@@ -85,6 +90,25 @@ void syncNodePingPeers(SSyncNode* pSyncNode) {}
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
)
{}
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
pPingTimer
==
NULL
)
{
pSyncNode
->
pPingTimer
=
taosTmrStart
(
pSyncNode
->
FpPingTimer
,
pSyncNode
->
pingTimerCounter
,
pSyncNode
,
gSyncEnv
->
pTimerManager
);
}
else
{
taosTmrReset
(
pSyncNode
->
FpPingTimer
,
pSyncNode
->
pingTimerCounter
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
}
atomic_store_8
(
&
pSyncNode
->
pingTimerStart
,
1
);
return
0
;
}
int32_t
syncNodeStopPingTimer
(
SSyncNode
*
pSyncNode
)
{
atomic_store_8
(
&
pSyncNode
->
pingTimerStart
,
0
);
pSyncNode
->
pingTimerCounter
=
TIMER_MAX_MS
;
return
0
;
}
// ------ local funciton ---------
static
int32_t
doSyncNodePing
(
struct
SSyncNode
*
ths
,
const
SyncPing
*
pMsg
)
{
int32_t
ret
=
0
;
...
...
@@ -129,4 +153,20 @@ static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries*
static
int32_t
onSyncNodeAppendEntriesReply
(
struct
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
return
ret
;
}
static
void
syncNodePingTimerCb
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
if
(
atomic_load_8
(
&
pSyncNode
->
pingTimerStart
))
{
++
(
pSyncNode
->
pingTimerCounter
);
// pSyncNode->pingTimerMS += 100;
sTrace
(
"pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, tmrId:%p "
,
pSyncNode
->
pingTimerCounter
,
pSyncNode
->
pingTimerMS
,
pSyncNode
->
pPingTimer
,
tmrId
);
taosTmrReset
(
syncNodePingTimerCb
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
&
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
syncNodePingSelf
(
pSyncNode
);
}
}
\ No newline at end of file
source/libs/sync/test/syncPingTest.cpp
浏览文件 @
c9efe3de
...
...
@@ -44,8 +44,8 @@ SSyncNode* doSync() {
return
pSyncNode
;
}
void
timerPingAll
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
void
timerPingAll
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
syncNodePingAll
(
pSyncNode
);
}
...
...
@@ -64,7 +64,13 @@ int main() {
SSyncNode
*
pSyncNode
=
doSync
();
pSyncNode
->
pPingTimer
=
syncEnvStartTimer
(
timerPingAll
,
1000
,
pSyncNode
);
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
assert
(
ret
==
0
);
taosMsleep
(
5000
);
ret
=
syncNodeStopPingTimer
(
pSyncNode
);
assert
(
ret
==
0
);
while
(
1
)
{
taosMsleep
(
1000
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录