Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9914657c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9914657c
编写于
11月 07, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: adjust sync hb
上级
848fd584
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
65 addition
and
66 deletion
+65
-66
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+0
-4
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+50
-47
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+3
-3
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+12
-12
未找到文件。
source/libs/sync/inc/syncInt.h
浏览文件 @
9914657c
...
@@ -111,10 +111,6 @@ typedef struct SElectTimer {
...
@@ -111,10 +111,6 @@ typedef struct SElectTimer {
void
*
pData
;
void
*
pData
;
}
SElectTimer
;
}
SElectTimer
;
int32_t
syncHbTimerInit
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
,
SRaftId
destId
);
int32_t
syncHbTimerStart
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
);
int32_t
syncHbTimerStop
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
);
typedef
struct
SPeerState
{
typedef
struct
SPeerState
{
SyncIndex
lastSendIndex
;
SyncIndex
lastSendIndex
;
int64_t
lastSendTime
;
int64_t
lastSendTime
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
9914657c
...
@@ -42,6 +42,9 @@ static int32_t syncNodeEqNoop(SSyncNode* ths);
...
@@ -42,6 +42,9 @@ static int32_t syncNodeEqNoop(SSyncNode* ths);
static
int32_t
syncNodeAppendNoop
(
SSyncNode
*
ths
);
static
int32_t
syncNodeAppendNoop
(
SSyncNode
*
ths
);
static
void
syncNodeEqPeerHeartbeatTimer
(
void
*
param
,
void
*
tmrId
);
static
void
syncNodeEqPeerHeartbeatTimer
(
void
*
param
,
void
*
tmrId
);
static
bool
syncIsConfigChanged
(
const
SSyncCfg
*
pOldCfg
,
const
SSyncCfg
*
pNewCfg
);
static
bool
syncIsConfigChanged
(
const
SSyncCfg
*
pOldCfg
,
const
SSyncCfg
*
pNewCfg
);
static
int32_t
syncHbTimerInit
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
,
SRaftId
destId
);
static
int32_t
syncHbTimerStart
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
);
static
int32_t
syncHbTimerStop
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
);
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
)
{
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
...
@@ -512,7 +515,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
...
@@ -512,7 +515,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
for (int
32_t
i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
...
@@ -531,7 +534,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
...
@@ -531,7 +534,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
ASSERT
(
pSyncNode
->
pRaftCfg
->
configIndexCount
>=
1
);
ASSERT
(
pSyncNode
->
pRaftCfg
->
configIndexCount
>=
1
);
SyncIndex
lastIndex
=
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
0
];
SyncIndex
lastIndex
=
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
0
];
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
configIndexCount
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
configIndexCount
;
++
i
)
{
if
((
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
i
]
>
lastIndex
&&
if
((
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
i
]
>
lastIndex
&&
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
i
]
<=
snapshotLastApplyIndex
)
{
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
i
]
<=
snapshotLastApplyIndex
)
{
lastIndex
=
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
i
];
lastIndex
=
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
i
];
...
@@ -600,7 +603,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
...
@@ -600,7 +603,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
}
}
ASSERT(rid == pSyncNode->rid);
ASSERT(rid == pSyncNode->rid);
pEpSet->numOfEps = 0;
pEpSet->numOfEps = 0;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
for (int
32_t
i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
(pEpSet->numOfEps)++;
...
@@ -621,7 +624,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
...
@@ -621,7 +624,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
}
}
pEpSet
->
numOfEps
=
0
;
pEpSet
->
numOfEps
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
snprintf
(
pEpSet
->
eps
[
i
].
fqdn
,
sizeof
(
pEpSet
->
eps
[
i
].
fqdn
),
"%s"
,
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodeFqdn
);
snprintf
(
pEpSet
->
eps
[
i
].
fqdn
,
sizeof
(
pEpSet
->
eps
[
i
].
fqdn
),
"%s"
,
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodeFqdn
);
pEpSet
->
eps
[
i
].
port
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodePort
;
pEpSet
->
eps
[
i
].
port
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodePort
;
(
pEpSet
->
numOfEps
)
++
;
(
pEpSet
->
numOfEps
)
++
;
...
@@ -710,7 +713,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -710,7 +713,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
return
ret
;
return
ret
;
}
}
int32_t
syncHbTimerInit
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
,
SRaftId
destId
)
{
static
int32_t
syncHbTimerInit
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
,
SRaftId
destId
)
{
pSyncTimer
->
pTimer
=
NULL
;
pSyncTimer
->
pTimer
=
NULL
;
pSyncTimer
->
counter
=
0
;
pSyncTimer
->
counter
=
0
;
pSyncTimer
->
timerMS
=
pSyncNode
->
hbBaseLine
;
pSyncTimer
->
timerMS
=
pSyncNode
->
hbBaseLine
;
...
@@ -720,7 +723,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de
...
@@ -720,7 +723,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de
return
0
;
return
0
;
}
}
int32_t
syncHbTimerStart
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
)
{
static
int32_t
syncHbTimerStart
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
if
(
syncIsInit
())
{
if
(
syncIsInit
())
{
SSyncHbTimerData
*
pData
=
taosMemoryMalloc
(
sizeof
(
SSyncHbTimerData
));
SSyncHbTimerData
*
pData
=
taosMemoryMalloc
(
sizeof
(
SSyncHbTimerData
));
...
@@ -737,7 +740,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
...
@@ -737,7 +740,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
return
ret
;
return
ret
;
}
}
int32_t
syncHbTimerStop
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
)
{
static
int32_t
syncHbTimerStop
(
SSyncNode
*
pSyncNode
,
SSyncTimer
*
pSyncTimer
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
atomic_add_fetch_64
(
&
pSyncTimer
->
logicClock
,
1
);
atomic_add_fetch_64
(
&
pSyncTimer
->
logicClock
,
1
);
taosTmrStop
(
pSyncTimer
->
pTimer
);
taosTmrStop
(
pSyncTimer
->
pTimer
);
...
@@ -837,14 +840,14 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
...
@@ -837,14 +840,14 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init peersNum, peers, peersId
// init peersNum, peers, peersId
pSyncNode
->
peersNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
-
1
;
pSyncNode
->
peersNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
-
1
;
int
j
=
0
;
int
32_t
j
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
if
(
i
!=
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
)
{
if
(
i
!=
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
)
{
pSyncNode
->
peersNodeInfo
[
j
]
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
];
pSyncNode
->
peersNodeInfo
[
j
]
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
];
j
++
;
j
++
;
}
}
}
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peersNodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
peersId
[
i
]))
{
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peersNodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
peersId
[
i
]))
{
sError
(
"vgId:%d, failed to determine raft member id, peer:%d"
,
pSyncNode
->
vgId
,
i
);
sError
(
"vgId:%d, failed to determine raft member id, peer:%d"
,
pSyncNode
->
vgId
,
i
);
goto
_error
;
goto
_error
;
...
@@ -853,7 +856,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
...
@@ -853,7 +856,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init replicaNum, replicasId
// init replicaNum, replicasId
pSyncNode
->
replicaNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
pSyncNode
->
replicaNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
replicasId
[
i
]))
{
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
replicasId
[
i
]))
{
sError
(
"vgId:%d, failed to determine raft member id, replica:%d"
,
pSyncNode
->
vgId
,
i
);
sError
(
"vgId:%d, failed to determine raft member id, replica:%d"
,
pSyncNode
->
vgId
,
i
);
goto
_error
;
goto
_error
;
...
@@ -1002,7 +1005,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
...
@@ -1002,7 +1005,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode
->
restoreFinish
=
false
;
pSyncNode
->
restoreFinish
=
false
;
// snapshot senders
// snapshot senders
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SSyncSnapshotSender
*
pSender
=
snapshotSenderCreate
(
pSyncNode
,
i
);
SSyncSnapshotSender
*
pSender
=
snapshotSenderCreate
(
pSyncNode
,
i
);
// ASSERT(pSender != NULL);
// ASSERT(pSender != NULL);
(
pSyncNode
->
senders
)[
i
]
=
pSender
;
(
pSyncNode
->
senders
)[
i
]
=
pSender
;
...
@@ -1133,7 +1136,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
...
@@ -1133,7 +1136,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree
(
pSyncNode
->
pFsm
);
taosMemoryFree
(
pSyncNode
->
pFsm
);
}
}
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
!=
NULL
)
{
if
((
pSyncNode
->
senders
)[
i
]
!=
NULL
)
{
snapshotSenderDestroy
((
pSyncNode
->
senders
)[
i
]);
snapshotSenderDestroy
((
pSyncNode
->
senders
)[
i
]);
(
pSyncNode
->
senders
)[
i
]
=
NULL
;
(
pSyncNode
->
senders
)[
i
]
=
NULL
;
...
@@ -1178,7 +1181,7 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
...
@@ -1178,7 +1181,7 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
destId
=
&
(
pSyncNode
->
peersId
[
i
]);
SRaftId
*
destId
=
&
(
pSyncNode
->
peersId
[
i
]);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
destId
,
pSyncNode
->
vgId
);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
destId
,
pSyncNode
->
vgId
);
ret
=
syncNodePing
(
pSyncNode
,
destId
,
pMsg
);
ret
=
syncNodePing
(
pSyncNode
,
destId
,
pMsg
);
...
@@ -1190,7 +1193,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
...
@@ -1190,7 +1193,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
SRaftId
*
destId
=
&
(
pSyncNode
->
replicasId
[
i
]);
SRaftId
*
destId
=
&
(
pSyncNode
->
replicasId
[
i
]);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
destId
,
pSyncNode
->
vgId
);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
destId
,
pSyncNode
->
vgId
);
ret
=
syncNodePing
(
pSyncNode
,
destId
,
pMsg
);
ret
=
syncNodePing
(
pSyncNode
,
destId
,
pMsg
);
...
@@ -1294,7 +1297,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
...
@@ -1294,7 +1297,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
#endif
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SSyncTimer
*
pSyncTimer
=
syncNodeGetHbTimer
(
pSyncNode
,
&
(
pSyncNode
->
peersId
[
i
]));
SSyncTimer
*
pSyncTimer
=
syncNodeGetHbTimer
(
pSyncNode
,
&
(
pSyncNode
->
peersId
[
i
]));
if
(
pSyncTimer
!=
NULL
)
{
if
(
pSyncTimer
!=
NULL
)
{
syncHbTimerStart
(
pSyncNode
,
pSyncTimer
);
syncHbTimerStart
(
pSyncNode
,
pSyncTimer
);
...
@@ -1313,7 +1316,7 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
...
@@ -1313,7 +1316,7 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
pSyncNode->pHeartbeatTimer = NULL;
pSyncNode->pHeartbeatTimer = NULL;
#endif
#endif
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SSyncTimer
*
pSyncTimer
=
syncNodeGetHbTimer
(
pSyncNode
,
&
(
pSyncNode
->
peersId
[
i
]));
SSyncTimer
*
pSyncTimer
=
syncNodeGetHbTimer
(
pSyncNode
,
&
(
pSyncNode
->
peersId
[
i
]));
if
(
pSyncTimer
!=
NULL
)
{
if
(
pSyncTimer
!=
NULL
)
{
syncHbTimerStop
(
pSyncNode
,
pSyncTimer
);
syncHbTimerStop
(
pSyncNode
,
pSyncTimer
);
...
@@ -1396,19 +1399,19 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
...
@@ -1396,19 +1399,19 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddNumberToObject
(
pRoot
,
"peersNum"
,
pSyncNode
->
peersNum
);
cJSON_AddNumberToObject
(
pRoot
,
"peersNum"
,
pSyncNode
->
peersNum
);
cJSON
*
pPeers
=
cJSON_CreateArray
();
cJSON
*
pPeers
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"peersNodeInfo"
,
pPeers
);
cJSON_AddItemToObject
(
pRoot
,
"peersNodeInfo"
,
pPeers
);
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
cJSON_AddItemToArray
(
pPeers
,
syncUtilNodeInfo2Json
(
&
pSyncNode
->
peersNodeInfo
[
i
]));
cJSON_AddItemToArray
(
pPeers
,
syncUtilNodeInfo2Json
(
&
pSyncNode
->
peersNodeInfo
[
i
]));
}
}
cJSON
*
pPeersId
=
cJSON_CreateArray
();
cJSON
*
pPeersId
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"peersId"
,
pPeersId
);
cJSON_AddItemToObject
(
pRoot
,
"peersId"
,
pPeersId
);
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
cJSON_AddItemToArray
(
pPeersId
,
syncUtilRaftId2Json
(
&
pSyncNode
->
peersId
[
i
]));
cJSON_AddItemToArray
(
pPeersId
,
syncUtilRaftId2Json
(
&
pSyncNode
->
peersId
[
i
]));
}
}
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pSyncNode
->
replicaNum
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pSyncNode
->
replicaNum
);
cJSON
*
pReplicasId
=
cJSON_CreateArray
();
cJSON
*
pReplicasId
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"replicasId"
,
pReplicasId
);
cJSON_AddItemToObject
(
pRoot
,
"replicasId"
,
pReplicasId
);
for
(
int
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
cJSON_AddItemToArray
(
pReplicasId
,
syncUtilRaftId2Json
(
&
pSyncNode
->
replicasId
[
i
]));
cJSON_AddItemToArray
(
pReplicasId
,
syncUtilRaftId2Json
(
&
pSyncNode
->
replicasId
[
i
]));
}
}
...
@@ -1505,7 +1508,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
...
@@ -1505,7 +1508,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// snapshot senders
// snapshot senders
cJSON
*
pSenders
=
cJSON_CreateArray
();
cJSON
*
pSenders
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"senders"
,
pSenders
);
cJSON_AddItemToObject
(
pRoot
,
"senders"
,
pSenders
);
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
cJSON_AddItemToArray
(
pSenders
,
snapshotSender2Json
((
pSyncNode
->
senders
)[
i
]));
cJSON_AddItemToArray
(
pSenders
,
snapshotSender2Json
((
pSyncNode
->
senders
)[
i
]));
}
}
...
@@ -1530,8 +1533,8 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
...
@@ -1530,8 +1533,8 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
}
}
inline
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
)
{
inline
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
)
{
int
len
=
256
;
int
32_t
len
=
256
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
if
(
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
if
(
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
...
@@ -1556,7 +1559,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
...
@@ -1556,7 +1559,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
bool
b1
=
false
;
bool
b1
=
false
;
bool
b2
=
false
;
bool
b2
=
false
;
for
(
int
i
=
0
;
i
<
config
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
config
->
replicaNum
;
++
i
)
{
if
(
strcmp
((
config
->
nodeInfo
)[
i
].
nodeFqdn
,
pSyncNode
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
if
(
strcmp
((
config
->
nodeInfo
)[
i
].
nodeFqdn
,
pSyncNode
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
(
config
->
nodeInfo
)[
i
].
nodePort
==
pSyncNode
->
myNodeInfo
.
nodePort
)
{
(
config
->
nodeInfo
)[
i
].
nodePort
==
pSyncNode
->
myNodeInfo
.
nodePort
)
{
b1
=
true
;
b1
=
true
;
...
@@ -1564,7 +1567,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
...
@@ -1564,7 +1567,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
}
}
}
}
for
(
int
i
=
0
;
i
<
config
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
config
->
replicaNum
;
++
i
)
{
SRaftId
raftId
;
SRaftId
raftId
;
raftId
.
addr
=
syncUtilAddr2U64
((
config
->
nodeInfo
)[
i
].
nodeFqdn
,
(
config
->
nodeInfo
)[
i
].
nodePort
);
raftId
.
addr
=
syncUtilAddr2U64
((
config
->
nodeInfo
)[
i
].
nodeFqdn
,
(
config
->
nodeInfo
)[
i
].
nodePort
);
raftId
.
vgId
=
pSyncNode
->
vgId
;
raftId
.
vgId
=
pSyncNode
->
vgId
;
...
@@ -1646,7 +1649,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1646,7 +1649,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
SRaftId
oldReplicasId
[
TSDB_MAX_REPLICA
];
SRaftId
oldReplicasId
[
TSDB_MAX_REPLICA
];
memcpy
(
oldReplicasId
,
pSyncNode
->
replicasId
,
sizeof
(
oldReplicasId
));
memcpy
(
oldReplicasId
,
pSyncNode
->
replicasId
,
sizeof
(
oldReplicasId
));
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
sSTrace
(
oldSenders
[
i
],
"snapshot sender save old"
);
sSTrace
(
oldSenders
[
i
],
"snapshot sender save old"
);
}
}
...
@@ -1657,20 +1660,20 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1657,20 +1660,20 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// init peersNum, peers, peersId
// init peersNum, peers, peersId
pSyncNode
->
peersNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
-
1
;
pSyncNode
->
peersNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
-
1
;
int
j
=
0
;
int
32_t
j
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
if
(
i
!=
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
)
{
if
(
i
!=
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
)
{
pSyncNode
->
peersNodeInfo
[
j
]
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
];
pSyncNode
->
peersNodeInfo
[
j
]
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
];
j
++
;
j
++
;
}
}
}
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peersNodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
peersId
[
i
]);
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peersNodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
peersId
[
i
]);
}
}
// init replicaNum, replicasId
// init replicaNum, replicasId
pSyncNode
->
replicaNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
pSyncNode
->
replicaNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
replicasId
[
i
]);
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
replicasId
[
i
]);
}
}
...
@@ -1685,15 +1688,15 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1685,15 +1688,15 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// reset snapshot senders
// reset snapshot senders
// clear new
// clear new
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
(
pSyncNode
->
senders
)[
i
]
=
NULL
;
(
pSyncNode
->
senders
)[
i
]
=
NULL
;
}
}
// reset new
// reset new
for
(
int
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
// reset sender
// reset sender
bool
reset
=
false
;
bool
reset
=
false
;
for
(
int
j
=
0
;
j
<
TSDB_MAX_REPLICA
;
++
j
)
{
for
(
int
32_t
j
=
0
;
j
<
TSDB_MAX_REPLICA
;
++
j
)
{
if
(
syncUtilSameId
(
&
(
pSyncNode
->
replicasId
)[
i
],
&
oldReplicasId
[
j
]))
{
if
(
syncUtilSameId
(
&
(
pSyncNode
->
replicasId
)[
i
],
&
oldReplicasId
[
j
]))
{
char
host
[
128
];
char
host
[
128
];
uint16_t
port
;
uint16_t
port
;
...
@@ -1716,7 +1719,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1716,7 +1719,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
}
// create new
// create new
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
sSTrace
((
pSyncNode
->
senders
)[
i
],
"snapshot sender create new"
);
sSTrace
((
pSyncNode
->
senders
)[
i
],
"snapshot sender create new"
);
...
@@ -1724,7 +1727,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1724,7 +1727,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
}
// free old
// free old
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
snapshotSenderDestroy
(
oldSenders
[
i
]);
snapshotSenderDestroy
(
oldSenders
[
i
]);
sNTrace
(
pSyncNode
,
"snapshot sender delete old %p replica-index:%d"
,
oldSenders
[
i
],
i
);
sNTrace
(
pSyncNode
,
"snapshot sender delete old %p replica-index:%d"
,
oldSenders
[
i
],
i
);
...
@@ -1865,7 +1868,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
...
@@ -1865,7 +1868,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// set leader cache
// set leader cache
pSyncNode
->
leaderCache
=
pSyncNode
->
myRaftId
;
pSyncNode
->
leaderCache
=
pSyncNode
->
myRaftId
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pNextIndex
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pNextIndex
->
replicaNum
;
++
i
)
{
// maybe overwrite myself, no harm
// maybe overwrite myself, no harm
// just do it!
// just do it!
...
@@ -1879,7 +1882,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
...
@@ -1879,7 +1882,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode
->
pNextIndex
->
index
[
i
]
=
lastIndex
+
1
;
pSyncNode
->
pNextIndex
->
index
[
i
]
=
lastIndex
+
1
;
}
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
pMatchIndex
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
pMatchIndex
->
replicaNum
;
++
i
)
{
// maybe overwrite myself, no harm
// maybe overwrite myself, no harm
// just do it!
// just do it!
pSyncNode
->
pMatchIndex
->
index
[
i
]
=
SYNC_INDEX_INVALID
;
pSyncNode
->
pMatchIndex
->
index
[
i
]
=
SYNC_INDEX_INVALID
;
...
@@ -1892,7 +1895,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
...
@@ -1892,7 +1895,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// update sender private term
// update sender private term
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
if (pMySender != NULL) {
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
for (int
32_t
i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
}
}
...
@@ -1946,7 +1949,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
...
@@ -1946,7 +1949,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
bool
syncNodeIsMnode
(
SSyncNode
*
pSyncNode
)
{
return
(
pSyncNode
->
vgId
==
1
);
}
bool
syncNodeIsMnode
(
SSyncNode
*
pSyncNode
)
{
return
(
pSyncNode
->
vgId
==
1
);
}
int32_t
syncNodePeerStateInit
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodePeerStateInit
(
SSyncNode
*
pSyncNode
)
{
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
pSyncNode
->
peerStates
[
i
].
lastSendIndex
=
SYNC_INDEX_INVALID
;
pSyncNode
->
peerStates
[
i
].
lastSendIndex
=
SYNC_INDEX_INVALID
;
pSyncNode
->
peerStates
[
i
].
lastSendTime
=
0
;
pSyncNode
->
peerStates
[
i
].
lastSendTime
=
0
;
}
}
...
@@ -2332,8 +2335,8 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
...
@@ -2332,8 +2335,8 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
static
void
deleteCacheEntry
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
taosMemoryFree
(
value
);
}
static
void
deleteCacheEntry
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
taosMemoryFree
(
value
);
}
static
int32_t
syncCacheEntry
(
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
,
LRUHandle
**
h
)
{
static
int32_t
syncCacheEntry
(
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
,
LRUHandle
**
h
)
{
int
code
=
0
;
int
32_t
code
=
0
;
int
entryLen
=
sizeof
(
*
pEntry
)
+
pEntry
->
dataLen
;
int
32_t
entryLen
=
sizeof
(
*
pEntry
)
+
pEntry
->
dataLen
;
LRUStatus
status
=
taosLRUCacheInsert
(
pLogStore
->
pCache
,
&
pEntry
->
index
,
sizeof
(
pEntry
->
index
),
pEntry
,
entryLen
,
LRUStatus
status
=
taosLRUCacheInsert
(
pLogStore
->
pCache
,
&
pEntry
->
index
,
sizeof
(
pEntry
->
index
),
pEntry
,
entryLen
,
deleteCacheEntry
,
h
,
TAOS_LRU_PRIORITY_LOW
);
deleteCacheEntry
,
h
,
TAOS_LRU_PRIORITY_LOW
);
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
if
(
status
!=
TAOS_LRU_STATUS_OK
)
{
...
@@ -2641,7 +2644,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
...
@@ -2641,7 +2644,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
}
}
int32_t
syncNodeUpdateNewConfigIndex
(
SSyncNode
*
ths
,
SSyncCfg
*
pNewCfg
)
{
int32_t
syncNodeUpdateNewConfigIndex
(
SSyncNode
*
ths
,
SSyncCfg
*
pNewCfg
)
{
for
(
int
i
=
0
;
i
<
pNewCfg
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNewCfg
->
replicaNum
;
++
i
)
{
SRaftId
raftId
;
SRaftId
raftId
;
raftId
.
addr
=
syncUtilAddr2U64
((
pNewCfg
->
nodeInfo
)[
i
].
nodeFqdn
,
(
pNewCfg
->
nodeInfo
)[
i
].
nodePort
);
raftId
.
addr
=
syncUtilAddr2U64
((
pNewCfg
->
nodeInfo
)[
i
].
nodeFqdn
,
(
pNewCfg
->
nodeInfo
)[
i
].
nodePort
);
raftId
.
vgId
=
ths
->
vgId
;
raftId
.
vgId
=
ths
->
vgId
;
...
@@ -2772,7 +2775,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
...
@@ -2772,7 +2775,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
}
}
bool
syncNodeInRaftGroup
(
SSyncNode
*
ths
,
SRaftId
*
pRaftId
)
{
bool
syncNodeInRaftGroup
(
SSyncNode
*
ths
,
SRaftId
*
pRaftId
)
{
for
(
int
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
ths
->
replicasId
)[
i
]),
pRaftId
))
{
if
(
syncUtilSameId
(
&
((
ths
->
replicasId
)[
i
]),
pRaftId
))
{
return
true
;
return
true
;
}
}
...
@@ -2782,7 +2785,7 @@ bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
...
@@ -2782,7 +2785,7 @@ bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
SSyncSnapshotSender
*
syncNodeGetSnapshotSender
(
SSyncNode
*
ths
,
SRaftId
*
pDestId
)
{
SSyncSnapshotSender
*
syncNodeGetSnapshotSender
(
SSyncNode
*
ths
,
SRaftId
*
pDestId
)
{
SSyncSnapshotSender
*
pSender
=
NULL
;
SSyncSnapshotSender
*
pSender
=
NULL
;
for
(
int
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
pDestId
,
&
((
ths
->
replicasId
)[
i
])))
{
if
(
syncUtilSameId
(
pDestId
,
&
((
ths
->
replicasId
)[
i
])))
{
pSender
=
(
ths
->
senders
)[
i
];
pSender
=
(
ths
->
senders
)[
i
];
}
}
...
@@ -2792,7 +2795,7 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
...
@@ -2792,7 +2795,7 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
SSyncTimer
*
syncNodeGetHbTimer
(
SSyncNode
*
ths
,
SRaftId
*
pDestId
)
{
SSyncTimer
*
syncNodeGetHbTimer
(
SSyncNode
*
ths
,
SRaftId
*
pDestId
)
{
SSyncTimer
*
pTimer
=
NULL
;
SSyncTimer
*
pTimer
=
NULL
;
for
(
int
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
pDestId
,
&
((
ths
->
replicasId
)[
i
])))
{
if
(
syncUtilSameId
(
pDestId
,
&
((
ths
->
replicasId
)[
i
])))
{
pTimer
=
&
((
ths
->
peerHeartbeatTimerArr
)[
i
]);
pTimer
=
&
((
ths
->
peerHeartbeatTimerArr
)[
i
]);
}
}
...
@@ -2802,7 +2805,7 @@ SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
...
@@ -2802,7 +2805,7 @@ SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
SPeerState
*
syncNodeGetPeerState
(
SSyncNode
*
ths
,
const
SRaftId
*
pDestId
)
{
SPeerState
*
syncNodeGetPeerState
(
SSyncNode
*
ths
,
const
SRaftId
*
pDestId
)
{
SPeerState
*
pState
=
NULL
;
SPeerState
*
pState
=
NULL
;
for
(
int
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
pDestId
,
&
((
ths
->
replicasId
)[
i
])))
{
if
(
syncUtilSameId
(
pDestId
,
&
((
ths
->
replicasId
)[
i
])))
{
pState
=
&
((
ths
->
peerStates
)[
i
]);
pState
=
&
((
ths
->
peerStates
)[
i
]);
}
}
...
@@ -2841,7 +2844,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
...
@@ -2841,7 +2844,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
}
}
}
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pSyncNode
->
peersId
)[
i
]);
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pSyncNode
->
peersId
)[
i
]);
if
(
pSender
!=
NULL
&&
pSender
->
start
)
{
if
(
pSender
!=
NULL
&&
pSender
->
start
)
{
sError
(
"sync cannot change3"
);
sError
(
"sync cannot change3"
);
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
9914657c
...
@@ -142,7 +142,7 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) {
...
@@ -142,7 +142,7 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) {
memset
(
s
,
0
,
len
+
1
);
memset
(
s
,
0
,
len
+
1
);
memcpy
(
s
,
ptr
,
len
);
memcpy
(
s
,
ptr
,
len
);
for
(
int
i
=
0
;
i
<
len
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
len
;
++
i
)
{
if
(
!
syncUtilCanPrint
(
s
[
i
]))
{
if
(
!
syncUtilCanPrint
(
s
[
i
]))
{
s
[
i
]
=
'.'
;
s
[
i
]
=
'.'
;
}
}
...
@@ -157,8 +157,8 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
...
@@ -157,8 +157,8 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
memset
(
s
,
0
,
len2
);
memset
(
s
,
0
,
len2
);
char
*
p
=
s
;
char
*
p
=
s
;
for
(
int
i
=
0
;
i
<
len
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
len
;
++
i
)
{
int
n
=
sprintf
(
p
,
"%d,"
,
ptr
[
i
]);
int
32_t
n
=
sprintf
(
p
,
"%d,"
,
ptr
[
i
]);
p
+=
n
;
p
+=
n
;
}
}
return
s
;
return
s
;
...
...
source/libs/sync/src/syncVoteMgr.c
浏览文件 @
9914657c
...
@@ -74,8 +74,8 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
...
@@ -74,8 +74,8 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
ASSERT
(
syncUtilSameId
(
&
pVotesGranted
->
pSyncNode
->
myRaftId
,
&
pMsg
->
destId
));
ASSERT
(
syncUtilSameId
(
&
pVotesGranted
->
pSyncNode
->
myRaftId
,
&
pMsg
->
destId
));
int
j
=
-
1
;
int
32_t
j
=
-
1
;
for
(
int
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pVotesGranted
->
replicas
))[
i
]),
&
(
pMsg
->
srcId
)))
{
if
(
syncUtilSameId
(
&
((
*
(
pVotesGranted
->
replicas
))[
i
]),
&
(
pMsg
->
srcId
)))
{
j
=
i
;
j
=
i
;
break
;
break
;
...
@@ -105,11 +105,11 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
...
@@ -105,11 +105,11 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesGranted
->
replicaNum
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesGranted
->
replicaNum
);
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
for
(
int
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
cJSON_AddItemToArray
(
pReplicas
,
syncUtilRaftId2Json
(
&
(
*
(
pVotesGranted
->
replicas
))[
i
]));
cJSON_AddItemToArray
(
pReplicas
,
syncUtilRaftId2Json
(
&
(
*
(
pVotesGranted
->
replicas
))[
i
]));
}
}
int
*
arr
=
(
int
*
)
taosMemoryMalloc
(
sizeof
(
in
t
)
*
pVotesGranted
->
replicaNum
);
int
32_t
*
arr
=
(
int32_t
*
)
taosMemoryMalloc
(
sizeof
(
int32_
t
)
*
pVotesGranted
->
replicaNum
);
for
(
int
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
arr
[
i
]
=
pVotesGranted
->
isGranted
[
i
];
arr
[
i
]
=
pVotesGranted
->
isGranted
[
i
];
}
}
cJSON
*
pIsGranted
=
cJSON_CreateIntArray
(
arr
,
pVotesGranted
->
replicaNum
);
cJSON
*
pIsGranted
=
cJSON_CreateIntArray
(
arr
,
pVotesGranted
->
replicaNum
);
...
@@ -168,7 +168,7 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
...
@@ -168,7 +168,7 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
bool
votesResponded
(
SVotesRespond
*
pVotesRespond
,
const
SRaftId
*
pRaftId
)
{
bool
votesResponded
(
SVotesRespond
*
pVotesRespond
,
const
SRaftId
*
pRaftId
)
{
bool
ret
=
false
;
bool
ret
=
false
;
for
(
int
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
(
*
pVotesRespond
->
replicas
)[
i
],
pRaftId
)
&&
pVotesRespond
->
isRespond
[
i
])
{
if
(
syncUtilSameId
(
&
(
*
pVotesRespond
->
replicas
)[
i
],
pRaftId
)
&&
pVotesRespond
->
isRespond
[
i
])
{
ret
=
true
;
ret
=
true
;
break
;
break
;
...
@@ -183,7 +183,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
...
@@ -183,7 +183,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
return
;
return
;
}
}
for
(
int
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pVotesRespond
->
replicas
))[
i
]),
&
pMsg
->
srcId
))
{
if
(
syncUtilSameId
(
&
((
*
(
pVotesRespond
->
replicas
))[
i
]),
&
pMsg
->
srcId
))
{
// ASSERT(pVotesRespond->isRespond[i] == false);
// ASSERT(pVotesRespond->isRespond[i] == false);
pVotesRespond
->
isRespond
[
i
]
=
true
;
pVotesRespond
->
isRespond
[
i
]
=
true
;
...
@@ -197,7 +197,7 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
...
@@ -197,7 +197,7 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
pVotesRespond
->
term
=
term
;
pVotesRespond
->
term
=
term
;
memset
(
pVotesRespond
->
isRespond
,
0
,
sizeof
(
pVotesRespond
->
isRespond
));
memset
(
pVotesRespond
->
isRespond
,
0
,
sizeof
(
pVotesRespond
->
isRespond
));
/*
/*
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
for (int
32_t
i = 0; i < pVotesRespond->replicaNum; ++i) {
pVotesRespond->isRespond[i] = false;
pVotesRespond->isRespond[i] = false;
}
}
*/
*/
...
@@ -211,12 +211,12 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
...
@@ -211,12 +211,12 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesRespond
->
replicaNum
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesRespond
->
replicaNum
);
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
for
(
int
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
cJSON_AddItemToArray
(
pReplicas
,
syncUtilRaftId2Json
(
&
(
*
(
pVotesRespond
->
replicas
))[
i
]));
cJSON_AddItemToArray
(
pReplicas
,
syncUtilRaftId2Json
(
&
(
*
(
pVotesRespond
->
replicas
))[
i
]));
}
}
int
respondNum
=
0
;
int
32_t
respondNum
=
0
;
int
*
arr
=
(
int
*
)
taosMemoryMalloc
(
sizeof
(
in
t
)
*
pVotesRespond
->
replicaNum
);
int
32_t
*
arr
=
(
int32_t
*
)
taosMemoryMalloc
(
sizeof
(
int32_
t
)
*
pVotesRespond
->
replicaNum
);
for
(
int
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
arr
[
i
]
=
pVotesRespond
->
isRespond
[
i
];
arr
[
i
]
=
pVotesRespond
->
isRespond
[
i
];
if
(
pVotesRespond
->
isRespond
[
i
])
{
if
(
pVotesRespond
->
isRespond
[
i
])
{
respondNum
++
;
respondNum
++
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录