Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
11a4ef35
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
11a4ef35
编写于
10月 21, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/sync2-merge' of
https://github.com/taosdata/TDengine
into feature/sync2-merge
上级
cf3d865b
92584d13
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
41 addition
and
29 deletion
+41
-29
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+18
-28
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+22
-0
未找到文件。
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
11a4ef35
...
@@ -71,8 +71,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
...
@@ -71,8 +71,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
mInfo
(
"trans:%d, is proposed and post sem"
,
transId
);
mInfo
(
"trans:%d, is proposed and post sem"
,
transId
);
}
}
pMgmt
->
transId
=
0
;
pMgmt
->
transId
=
0
;
taosWUnLockLatch
(
&
pMgmt
->
lock
);
tsem_post
(
&
pMgmt
->
syncSem
);
tsem_post
(
&
pMgmt
->
syncSem
);
taosWUnLockLatch
(
&
pMgmt
->
lock
);
}
else
{
}
else
{
taosWUnLockLatch
(
&
pMgmt
->
lock
);
taosWUnLockLatch
(
&
pMgmt
->
lock
);
STrans
*
pTrans
=
mndAcquireTrans
(
pMnode
,
transId
);
STrans
*
pTrans
=
mndAcquireTrans
(
pMnode
,
transId
);
...
@@ -113,27 +113,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
...
@@ -113,27 +113,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
}
}
}
}
void
mndReConfig
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SReConfigCbMeta
*
cbMeta
)
{
void
mndReConfig
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SReConfigCbMeta
*
cbMeta
)
{}
SMnode
*
pMnode
=
pFsm
->
data
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
errCode
=
cbMeta
->
code
;
mInfo
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
,
pMgmt
->
transId
,
cbMeta
->
code
,
cbMeta
->
index
,
cbMeta
->
term
);
taosWLockLatch
(
&
pMgmt
->
lock
);
if
(
pMgmt
->
transId
==
-
1
)
{
if
(
pMgmt
->
errCode
!=
0
)
{
mError
(
"trans:-1, failed to propose sync reconfig since %s, post sem"
,
tstrerror
(
pMgmt
->
errCode
));
}
else
{
mInfo
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
" post sem"
,
pMgmt
->
transId
,
cbMeta
->
code
,
cbMeta
->
index
,
cbMeta
->
term
);
}
pMgmt
->
transId
=
0
;
tsem_post
(
&
pMgmt
->
syncSem
);
}
taosWUnLockLatch
(
&
pMgmt
->
lock
);
}
int32_t
mndSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
int32_t
mndSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
mInfo
(
"start to read snapshot from sdb"
);
mInfo
(
"start to read snapshot from sdb"
);
...
@@ -179,11 +159,14 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
...
@@ -179,11 +159,14 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
static
void
mndBecomeFollower
(
struct
SSyncFSM
*
pFsm
)
{
static
void
mndBecomeFollower
(
struct
SSyncFSM
*
pFsm
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
mInfo
(
"vgId:1, become follower
and post sem
"
);
mInfo
(
"vgId:1, become follower"
);
taosWLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
taosWLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
if
(
pMnode
->
syncMgmt
.
transId
!=
0
)
{
if
(
pMnode
->
syncMgmt
.
transId
!=
0
)
{
mInfo
(
"vgId:1, become follower and post sem, trans:%d, failed to propose since not leader"
,
pMnode
->
syncMgmt
.
transId
);
pMnode
->
syncMgmt
.
transId
=
0
;
pMnode
->
syncMgmt
.
transId
=
0
;
pMnode
->
syncMgmt
.
errCode
=
TSDB_CODE_SYN_NOT_LEADER
;
tsem_post
(
&
pMnode
->
syncMgmt
.
syncSem
);
tsem_post
(
&
pMnode
->
syncMgmt
.
syncSem
);
}
}
taosWUnLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
taosWUnLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
...
@@ -292,6 +275,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
...
@@ -292,6 +275,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
req
,
isWeak
);
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
req
,
isWeak
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
mInfo
(
"trans:%d, is proposing and wait sem"
,
pMgmt
->
transId
);
tsem_wait
(
&
pMgmt
->
syncSem
);
tsem_wait
(
&
pMgmt
->
syncSem
);
}
else
if
(
code
>
0
)
{
}
else
if
(
code
>
0
)
{
mInfo
(
"trans:%d, confirm at once since replica is 1, continue execute"
,
transId
);
mInfo
(
"trans:%d, confirm at once since replica is 1, continue execute"
,
transId
);
...
@@ -301,13 +285,17 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
...
@@ -301,13 +285,17 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
sdbWriteWithoutFree
(
pMnode
->
pSdb
,
pRaw
);
sdbWriteWithoutFree
(
pMnode
->
pSdb
,
pRaw
);
sdbSetApplyInfo
(
pMnode
->
pSdb
,
req
.
info
.
conn
.
applyIndex
,
req
.
info
.
conn
.
applyTerm
,
SYNC_INDEX_INVALID
);
sdbSetApplyInfo
(
pMnode
->
pSdb
,
req
.
info
.
conn
.
applyIndex
,
req
.
info
.
conn
.
applyTerm
,
SYNC_INDEX_INVALID
);
code
=
0
;
code
=
0
;
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
}
else
{
taosWLockLatch
(
&
pMgmt
->
lock
);
mInfo
(
"trans:%d, failed to proposed since %s"
,
transId
,
terrstr
());
pMgmt
->
transId
=
0
;
taosWUnLockLatch
(
&
pMgmt
->
lock
);
if
(
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_INTERNAL_ERROR
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
else
{
}
else
{
terrno
=
TSDB_CODE_APP_ERROR
;
terrno
=
TSDB_CODE_APP_ERROR
;
}
}
}
rpcFreeCont
(
req
.
pCont
);
rpcFreeCont
(
req
.
pCont
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
...
@@ -315,6 +303,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
...
@@ -315,6 +303,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
return
code
;
return
code
;
}
}
if
(
pMgmt
->
errCode
!=
0
)
terrno
=
pMgmt
->
errCode
;
return
pMgmt
->
errCode
;
return
pMgmt
->
errCode
;
}
}
...
@@ -328,6 +317,7 @@ void mndSyncStart(SMnode *pMnode) {
...
@@ -328,6 +317,7 @@ void mndSyncStart(SMnode *pMnode) {
void
mndSyncStop
(
SMnode
*
pMnode
)
{
void
mndSyncStop
(
SMnode
*
pMnode
)
{
taosWLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
taosWLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
if
(
pMnode
->
syncMgmt
.
transId
!=
0
)
{
if
(
pMnode
->
syncMgmt
.
transId
!=
0
)
{
mInfo
(
"vgId:1, is stopped and post sem, trans:%d"
,
pMnode
->
syncMgmt
.
transId
);
pMnode
->
syncMgmt
.
transId
=
0
;
pMnode
->
syncMgmt
.
transId
=
0
;
tsem_post
(
&
pMnode
->
syncMgmt
.
syncSem
);
tsem_post
(
&
pMnode
->
syncMgmt
.
syncSem
);
}
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
11a4ef35
...
@@ -778,7 +778,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
...
@@ -778,7 +778,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
mInfo
(
"trans:%d, sync to other mnodes, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
));
mInfo
(
"trans:%d, sync to other mnodes, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
));
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
,
pTrans
->
id
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync
since %s"
,
pTrans
->
id
,
terrstr
(
));
mError
(
"trans:%d, failed to sync
, errno:%s code:%s"
,
pTrans
->
id
,
terrstr
(),
tstrerror
(
code
));
sdbFreeRaw
(
pRaw
);
sdbFreeRaw
(
pRaw
);
return
-
1
;
return
-
1
;
}
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
11a4ef35
...
@@ -1115,6 +1115,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
...
@@ -1115,6 +1115,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
sError
(
"failed to create raft cfg file. configPath: %s"
,
pSyncNode
->
configPath
);
sError
(
"failed to create raft cfg file. configPath: %s"
,
pSyncNode
->
configPath
);
goto
_error
;
goto
_error
;
}
}
if
(
pSyncInfo
->
syncCfg
.
replicaNum
==
0
)
{
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
}
}
else
{
}
else
{
// update syncCfg by raft_config.json
// update syncCfg by raft_config.json
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
...
@@ -2047,8 +2050,27 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
...
@@ -2047,8 +2050,27 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
return
b1
;
return
b1
;
}
}
static
bool
syncIsConfigChanged
(
const
SSyncCfg
*
pOldCfg
,
const
SSyncCfg
*
pNewCfg
)
{
if
(
pOldCfg
->
replicaNum
!=
pNewCfg
->
replicaNum
)
return
true
;
if
(
pOldCfg
->
myIndex
!=
pNewCfg
->
myIndex
)
return
true
;
for
(
int32_t
i
=
0
;
i
<
pOldCfg
->
replicaNum
;
++
i
)
{
const
SNodeInfo
*
pOldInfo
=
&
pOldCfg
->
nodeInfo
[
i
];
const
SNodeInfo
*
pNewInfo
=
&
pNewCfg
->
nodeInfo
[
i
];
if
(
strcmp
(
pOldInfo
->
nodeFqdn
,
pNewInfo
->
nodeFqdn
)
!=
0
)
return
true
;
if
(
pOldInfo
->
nodePort
!=
pNewInfo
->
nodePort
)
return
true
;
}
return
false
;
}
void
syncNodeDoConfigChange
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
pNewConfig
,
SyncIndex
lastConfigChangeIndex
)
{
void
syncNodeDoConfigChange
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
pNewConfig
,
SyncIndex
lastConfigChangeIndex
)
{
SSyncCfg
oldConfig
=
pSyncNode
->
pRaftCfg
->
cfg
;
SSyncCfg
oldConfig
=
pSyncNode
->
pRaftCfg
->
cfg
;
#if 0
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
sInfo("vgId:1, sync not reconfig since not changed");
return;
}
#endif
pSyncNode
->
pRaftCfg
->
cfg
=
*
pNewConfig
;
pSyncNode
->
pRaftCfg
->
cfg
=
*
pNewConfig
;
pSyncNode
->
pRaftCfg
->
lastConfigIndex
=
lastConfigChangeIndex
;
pSyncNode
->
pRaftCfg
->
lastConfigIndex
=
lastConfigChangeIndex
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录