Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
46e55ba9
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
46e55ba9
编写于
11月 07, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact: remove config change codes
上级
e060de70
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
56 addition
and
141 deletion
+56
-141
include/util/taoserror.h
include/util/taoserror.h
+1
-1
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+0
-1
source/libs/sync/inc/syncTools.h
source/libs/sync/inc/syncTools.h
+0
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+14
-113
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+0
-12
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+41
-12
未找到文件。
include/util/taoserror.h
浏览文件 @
46e55ba9
...
@@ -398,7 +398,7 @@ int32_t* taosGetErrno();
...
@@ -398,7 +398,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F)
#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F)
// internal
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
46e55ba9
...
@@ -278,7 +278,6 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
...
@@ -278,7 +278,6 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool
syncNodeInConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
config
);
bool
syncNodeInConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
config
);
void
syncNodeDoConfigChange
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
newConfig
,
SyncIndex
lastConfigChangeIndex
);
void
syncNodeDoConfigChange
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
newConfig
,
SyncIndex
lastConfigChangeIndex
);
SyncIndex
syncMinMatchIndex
(
SSyncNode
*
pSyncNode
);
SyncIndex
syncMinMatchIndex
(
SSyncNode
*
pSyncNode
);
char
*
syncNodePeerState2Str
(
const
SSyncNode
*
pSyncNode
);
// raft state change --------------
// raft state change --------------
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
);
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
);
...
...
source/libs/sync/inc/syncTools.h
浏览文件 @
46e55ba9
...
@@ -26,8 +26,6 @@ typedef struct SRaftId {
...
@@ -26,8 +26,6 @@ typedef struct SRaftId {
SyncGroupId
vgId
;
SyncGroupId
vgId
;
}
SRaftId
;
}
SRaftId
;
char
*
sync2SimpleStr
(
int64_t
rid
);
// for compatibility, the same as syncPropose
// for compatibility, the same as syncPropose
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
46e55ba9
...
@@ -83,11 +83,10 @@ void syncStop(int64_t rid) {
...
@@ -83,11 +83,10 @@ void syncStop(int64_t rid) {
void
syncPreStop
(
int64_t
rid
)
{
void
syncPreStop
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
return
;
if
(
pSyncNode
!=
NULL
)
{
syncNodePreClose
(
pSyncNode
);
syncNodePreClose
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
}
syncNodeRelease
(
pSyncNode
);
}
}
static
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pCfg
)
{
static
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pCfg
)
{
...
@@ -217,48 +216,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
...
@@ -217,48 +216,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
return
minMatchIndex
;
return
minMatchIndex
;
}
}
char
*
syncNodePeerState2Str
(
const
SSyncNode
*
pSyncNode
)
{
int32_t
len
=
128
;
int32_t
useLen
=
0
;
int32_t
leftLen
=
len
-
useLen
;
char
*
pStr
=
taosMemoryMalloc
(
len
);
memset
(
pStr
,
0
,
len
);
char
*
p
=
pStr
;
int32_t
use
=
snprintf
(
p
,
leftLen
,
"{"
);
useLen
+=
use
;
leftLen
-=
use
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
SPeerState
*
pState
=
syncNodeGetPeerState
((
SSyncNode
*
)
pSyncNode
,
&
(
pSyncNode
->
replicasId
[
i
]));
if
(
pState
==
NULL
)
{
sError
(
"vgId:%d, replica maybe dropped"
,
pSyncNode
->
vgId
);
break
;
}
p
=
pStr
+
useLen
;
use
=
snprintf
(
p
,
leftLen
,
"%d:%"
PRId64
" ,%"
PRId64
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
useLen
+=
use
;
leftLen
-=
use
;
}
p
=
pStr
+
useLen
;
use
=
snprintf
(
p
,
leftLen
,
"}"
);
useLen
+=
use
;
leftLen
-=
use
;
// sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr);
return
pStr
;
}
int32_t
syncBeginSnapshot
(
int64_t
rid
,
int64_t
lastApplyIndex
)
{
int32_t
syncBeginSnapshot
(
int64_t
rid
,
int64_t
lastApplyIndex
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
if
(
pSyncNode
==
NULL
)
return
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
syncNodeIsMnode
(
pSyncNode
))
{
if
(
syncNodeIsMnode
(
pSyncNode
))
{
...
@@ -366,19 +327,14 @@ _DEL_WAL:
...
@@ -366,19 +327,14 @@ _DEL_WAL:
int32_t
syncEndSnapshot
(
int64_t
rid
)
{
int32_t
syncEndSnapshot
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
if
(
pSyncNode
==
NULL
)
return
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
atomic_load_64
(
&
pSyncNode
->
snapshottingIndex
)
!=
SYNC_INDEX_INVALID
)
{
if
(
atomic_load_64
(
&
pSyncNode
->
snapshottingIndex
)
!=
SYNC_INDEX_INVALID
)
{
SSyncLogStoreData
*
pData
=
pSyncNode
->
pLogStore
->
data
;
SSyncLogStoreData
*
pData
=
pSyncNode
->
pLogStore
->
data
;
code
=
walEndSnapshot
(
pData
->
pWal
);
code
=
walEndSnapshot
(
pData
->
pWal
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
sError
(
"vgId:%d, wal snapshot end error since:%s"
,
pSyncNode
->
vgId
,
terrstr
());
sNError
(
pSyncNode
,
"wal snapshot end error since:%s"
,
terrstr
());
syncNodeRelease
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
-
1
;
return
-
1
;
}
else
{
}
else
{
...
@@ -393,25 +349,16 @@ int32_t syncEndSnapshot(int64_t rid) {
...
@@ -393,25 +349,16 @@ int32_t syncEndSnapshot(int64_t rid) {
int32_t
syncStepDown
(
int64_t
rid
,
SyncTerm
newTerm
)
{
int32_t
syncStepDown
(
int64_t
rid
,
SyncTerm
newTerm
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
if
(
pSyncNode
==
NULL
)
return
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
syncNodeStepDown
(
pSyncNode
,
newTerm
);
syncNodeStepDown
(
pSyncNode
,
newTerm
);
syncNodeRelease
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
0
;
return
0
;
}
}
bool
syncIsReadyForRead
(
int64_t
rid
)
{
bool
syncIsReadyForRead
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
if
(
pSyncNode
==
NULL
)
return
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
false
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
&&
pSyncNode
->
restoreFinish
)
{
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
&&
pSyncNode
->
restoreFinish
)
{
syncNodeRelease
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
...
@@ -428,7 +375,7 @@ bool syncIsReadyForRead(int64_t rid) {
...
@@ -428,7 +375,7 @@ bool syncIsReadyForRead(int64_t rid) {
if
(
!
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
))
{
if
(
!
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
))
{
SSyncRaftEntry
*
pEntry
=
NULL
;
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
),
&
pEntry
);
pSyncNode
->
pLogStore
,
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
),
&
pEntry
);
if
(
code
==
0
&&
pEntry
!=
NULL
)
{
if
(
code
==
0
&&
pEntry
!=
NULL
)
{
if
(
pEntry
->
originalRpcType
==
TDMT_SYNC_NOOP
&&
pEntry
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
if
(
pEntry
->
originalRpcType
==
TDMT_SYNC_NOOP
&&
pEntry
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
ready
=
true
;
ready
=
true
;
...
@@ -469,8 +416,6 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
...
@@ -469,8 +416,6 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
}
}
int32_t
syncNodeLeaderTransferTo
(
SSyncNode
*
pSyncNode
,
SNodeInfo
newLeader
)
{
int32_t
syncNodeLeaderTransferTo
(
SSyncNode
*
pSyncNode
,
SNodeInfo
newLeader
)
{
int32_t
ret
=
0
;
if
(
pSyncNode
->
replicaNum
==
1
)
{
if
(
pSyncNode
->
replicaNum
==
1
)
{
sDebug
(
"only one replica, cannot leader transfer"
);
sDebug
(
"only one replica, cannot leader transfer"
);
terrno
=
TSDB_CODE_SYN_ONE_REPLICA
;
terrno
=
TSDB_CODE_SYN_ONE_REPLICA
;
...
@@ -488,42 +433,10 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
...
@@ -488,42 +433,10 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
syncLeaderTransfer2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncLeaderTransfer2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncLeaderTransferDestroy
(
pMsg
);
syncLeaderTransferDestroy
(
pMsg
);
ret
=
syncNodePropose
(
pSyncNode
,
&
rpcMsg
,
false
);
int32_t
ret
=
syncNodePropose
(
pSyncNode
,
&
rpcMsg
,
false
);
return
ret
;
return
ret
;
}
}
bool
syncCanLeaderTransfer
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
->
replicaNum
==
1
)
{
syncNodeRelease
(
pSyncNode
);
return
false
;
}
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
syncNodeRelease
(
pSyncNode
);
return
true
;
}
bool
matchOK
=
true
;
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
||
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
SyncIndex
myCommitIndex
=
pSyncNode
->
commitIndex
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SyncIndex
peerMatchIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pMatchIndex
,
&
(
pSyncNode
->
peersId
)[
i
]);
if
(
peerMatchIndex
<
myCommitIndex
)
{
matchOK
=
false
;
}
}
}
syncNodeRelease
(
pSyncNode
);
return
matchOK
;
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
syncPropose
(
rid
,
pMsg
,
isWeak
);
int32_t
ret
=
syncPropose
(
rid
,
pMsg
,
isWeak
);
return
ret
;
return
ret
;
...
@@ -706,7 +619,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
...
@@ -706,7 +619,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
memset
(
pEpSet
,
0
,
sizeof
(
*
pEpSet
));
memset
(
pEpSet
,
0
,
sizeof
(
*
pEpSet
));
return
;
return
;
}
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
pEpSet
->
numOfEps
=
0
;
pEpSet
->
numOfEps
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
snprintf
(
pEpSet
->
eps
[
i
].
fqdn
,
sizeof
(
pEpSet
->
eps
[
i
].
fqdn
),
"%s"
,
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodeFqdn
);
snprintf
(
pEpSet
->
eps
[
i
].
fqdn
,
sizeof
(
pEpSet
->
eps
[
i
].
fqdn
),
"%s"
,
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodeFqdn
);
...
@@ -722,6 +635,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
...
@@ -722,6 +635,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
}
}
static
void
syncGetAndDelRespRpc
(
SSyncNode
*
pSyncNode
,
uint64_t
index
,
SRpcHandleInfo
*
pInfo
)
{
static
void
syncGetAndDelRespRpc
(
SSyncNode
*
pSyncNode
,
uint64_t
index
,
SRpcHandleInfo
*
pInfo
)
{
SRespStub
stub
;
SRespStub
stub
;
int32_t
ret
=
syncRespMgrGetAndDel
(
pSyncNode
->
pSyncRespMgr
,
index
,
&
stub
);
int32_t
ret
=
syncRespMgrGetAndDel
(
pSyncNode
->
pSyncRespMgr
,
index
,
&
stub
);
...
@@ -732,19 +646,6 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl
...
@@ -732,19 +646,6 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl
sTrace
(
"vgId:%d, get seq:%"
PRIu64
" rpc handle:%p"
,
pSyncNode
->
vgId
,
index
,
pInfo
->
handle
);
sTrace
(
"vgId:%d, get seq:%"
PRIu64
" rpc handle:%p"
,
pSyncNode
->
vgId
,
index
,
pInfo
->
handle
);
}
}
char
*
sync2SimpleStr
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
sTrace
(
"syncSetRpc get pSyncNode is NULL, rid:%"
PRId64
,
rid
);
return
NULL
;
}
char
*
s
=
syncNode2SimpleStr
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
s
;
}
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
return
-
1
;
if
(
pSyncNode
==
NULL
)
return
-
1
;
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
46e55ba9
...
@@ -249,18 +249,6 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
...
@@ -249,18 +249,6 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
return
serialized
;
return
serialized
;
}
}
void
syncCfg2SimpleStr
(
const
SSyncCfg
*
pCfg
,
char
*
buf
,
int32_t
bufLen
)
{
int32_t
len
=
snprintf
(
buf
,
bufLen
,
"{r-num:%d, my:%d, "
,
pCfg
->
replicaNum
,
pCfg
->
myIndex
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
replicaNum
;
++
i
)
{
if
(
i
<
pCfg
->
replicaNum
-
1
)
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%s:%d, "
,
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
pCfg
->
nodeInfo
[
i
].
nodePort
);
}
else
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%s:%d}"
,
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
pCfg
->
nodeInfo
[
i
].
nodePort
);
}
}
}
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
)
{
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
)
{
memset
(
pSyncCfg
,
0
,
sizeof
(
SSyncCfg
));
memset
(
pSyncCfg
,
0
,
sizeof
(
SSyncCfg
));
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
46e55ba9
...
@@ -202,6 +202,35 @@ bool syncUtilUserRollback(tmsg_t msgType) {
...
@@ -202,6 +202,35 @@ bool syncUtilUserRollback(tmsg_t msgType) {
return
false
;
return
false
;
}
}
void
syncCfg2SimpleStr
(
const
SSyncCfg
*
pCfg
,
char
*
buf
,
int32_t
bufLen
)
{
int32_t
len
=
snprintf
(
buf
,
bufLen
,
"{r-num:%d, my:%d, "
,
pCfg
->
replicaNum
,
pCfg
->
myIndex
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
replicaNum
;
++
i
)
{
if
(
i
<
pCfg
->
replicaNum
-
1
)
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%s:%d, "
,
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
pCfg
->
nodeInfo
[
i
].
nodePort
);
}
else
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%s:%d}"
,
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
pCfg
->
nodeInfo
[
i
].
nodePort
);
}
}
}
static
void
syncPeerState2Str
(
SSyncNode
*
pSyncNode
,
char
*
buf
,
int32_t
bufLen
)
{
int32_t
len
=
1
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
SPeerState
*
pState
=
syncNodeGetPeerState
(
pSyncNode
,
&
(
pSyncNode
->
replicasId
[
i
]));
if
(
pState
==
NULL
)
break
;
if
(
i
<
pSyncNode
->
replicaNum
-
1
)
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%d:%"
PRId64
" %"
PRId64
", "
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
}
else
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%d:%"
PRId64
" %"
PRId64
"}"
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
}
}
}
void
syncPrintNodeLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncNode
*
pNode
,
const
char
*
format
,
...)
{
void
syncPrintNodeLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncNode
*
pNode
,
const
char
*
format
,
...)
{
if
(
pNode
==
NULL
||
pNode
->
pRaftCfg
!=
NULL
&&
pNode
->
pRaftStore
==
NULL
||
pNode
->
pLogStore
==
NULL
)
return
;
if
(
pNode
==
NULL
||
pNode
->
pRaftCfg
!=
NULL
&&
pNode
->
pRaftStore
==
NULL
||
pNode
->
pLogStore
==
NULL
)
return
;
...
@@ -220,7 +249,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
...
@@ -220,7 +249,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
char
cfgStr
[
1024
];
char
cfgStr
[
1024
];
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
char
*
pPeerStateStr
=
syncNodePeerState2Str
(
pNode
);
char
peerStr
[
1024
]
=
"{"
;
syncPeerState2Str
(
pNode
,
peerStr
,
sizeof
(
peerStr
));
int32_t
quorum
=
syncNodeDynamicQuorum
(
pNode
);
int32_t
quorum
=
syncNodeDynamicQuorum
(
pNode
);
char
eventLog
[
512
];
// {0};
char
eventLog
[
512
];
// {0};
...
@@ -239,9 +270,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
...
@@ -239,9 +270,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
pRaftCfg
->
isStandBy
,
pNode
->
pRaftCfg
->
snapshotStrategy
,
pNode
->
pRaftCfg
->
batchSize
,
pNode
->
pRaftCfg
->
isStandBy
,
pNode
->
pRaftCfg
->
snapshotStrategy
,
pNode
->
pRaftCfg
->
batchSize
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
pPeerStateStr
,
cfgStr
);
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
taosMemoryFree
(
pPeerStateStr
);
}
}
void
syncPrintSnapshotSenderLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncSnapshotSender
*
pSender
,
void
syncPrintSnapshotSenderLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncSnapshotSender
*
pSender
,
...
@@ -264,7 +293,9 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
...
@@ -264,7 +293,9 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
char
cfgStr
[
1024
];
char
cfgStr
[
1024
];
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
char
*
pPeerStateStr
=
syncNodePeerState2Str
(
pNode
);
char
peerStr
[
1024
]
=
"{"
;
syncPeerState2Str
(
pNode
,
peerStr
,
sizeof
(
peerStr
));
int32_t
quorum
=
syncNodeDynamicQuorum
(
pNode
);
int32_t
quorum
=
syncNodeDynamicQuorum
(
pNode
);
SRaftId
destId
=
pNode
->
replicasId
[
pSender
->
replicaIndex
];
SRaftId
destId
=
pNode
->
replicasId
[
pSender
->
replicaIndex
];
char
host
[
64
];
char
host
[
64
];
...
@@ -291,9 +322,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
...
@@ -291,9 +322,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
pRaftCfg
->
isStandBy
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
pRaftCfg
->
isStandBy
,
pNode
->
pRaftCfg
->
snapshotStrategy
,
pNode
->
pRaftCfg
->
batchSize
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
snapshotStrategy
,
pNode
->
pRaftCfg
->
batchSize
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
pPeerStateStr
,
cfgStr
);
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
taosMemoryFree
(
pPeerStateStr
);
}
}
void
syncPrintSnapshotReceiverLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncSnapshotReceiver
*
pReceiver
,
void
syncPrintSnapshotReceiverLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncSnapshotReceiver
*
pReceiver
,
...
@@ -316,7 +345,9 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
...
@@ -316,7 +345,9 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
char
cfgStr
[
1024
];
char
cfgStr
[
1024
];
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
char
*
pPeerStateStr
=
syncNodePeerState2Str
(
pNode
);
char
peerStr
[
1024
]
=
"{"
;
syncPeerState2Str
(
pNode
,
peerStr
,
sizeof
(
peerStr
));
int32_t
quorum
=
syncNodeDynamicQuorum
(
pNode
);
int32_t
quorum
=
syncNodeDynamicQuorum
(
pNode
);
SRaftId
fromId
=
pReceiver
->
fromId
;
SRaftId
fromId
=
pReceiver
->
fromId
;
char
host
[
128
];
char
host
[
128
];
...
@@ -344,7 +375,5 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
...
@@ -344,7 +375,5 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
pRaftCfg
->
isStandBy
,
pNode
->
pRaftCfg
->
snapshotStrategy
,
pNode
->
pRaftCfg
->
batchSize
,
pNode
->
pRaftCfg
->
isStandBy
,
pNode
->
pRaftCfg
->
snapshotStrategy
,
pNode
->
pRaftCfg
->
batchSize
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
pPeerStateStr
,
cfgStr
);
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
taosMemoryFree
(
pPeerStateStr
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录