Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fa0f6b50
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
fa0f6b50
编写于
8月 28, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
8月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16424 from taosdata/fix/dnode
refactor: change mnode log level
上级
17acc9bc
a1b295b1
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
59 addition
and
59 deletion
+59
-59
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+7
-7
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+14
-14
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+38
-38
未找到文件。
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
fa0f6b50
...
@@ -89,14 +89,14 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
...
@@ -89,14 +89,14 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
m
Debug
(
"mnode:%d, will be created when deploying, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
m
Info
(
"mnode:%d, will be created when deploying, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to create since %s"
,
mnodeObj
.
id
,
terrstr
());
mError
(
"mnode:%d, failed to create since %s"
,
mnodeObj
.
id
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
m
Debug
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
mnodeObj
.
id
);
m
Info
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
mnodeObj
.
id
);
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -365,7 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
...
@@ -365,7 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
m
Debug
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
m
Info
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
if
(
mndSetCreateMnodeRedoLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeRedoLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeCommitLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeCommitLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
...
@@ -392,7 +392,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
...
@@ -392,7 +392,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
goto
_OVER
;
goto
_OVER
;
}
}
m
Debug
(
"mnode:%d, start to create"
,
createReq
.
dnodeId
);
m
Info
(
"mnode:%d, start to create"
,
createReq
.
dnodeId
);
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_CREATE_MNODE
)
!=
0
)
{
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_CREATE_MNODE
)
!=
0
)
{
goto
_OVER
;
goto
_OVER
;
}
}
...
@@ -574,7 +574,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
...
@@ -574,7 +574,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
m
Debug
(
"trans:%d, used to drop mnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
m
Info
(
"trans:%d, used to drop mnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
if
(
mndSetDropMnodeInfoToTrans
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeInfoToTrans
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
...
@@ -597,7 +597,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
...
@@ -597,7 +597,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto
_OVER
;
goto
_OVER
;
}
}
m
Debug
(
"mnode:%d, start to drop"
,
dropReq
.
dnodeId
);
m
Info
(
"mnode:%d, start to drop"
,
dropReq
.
dnodeId
);
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_DROP_MNODE
)
!=
0
)
{
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_DROP_MNODE
)
!=
0
)
{
goto
_OVER
;
goto
_OVER
;
}
}
...
@@ -732,7 +732,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
...
@@ -732,7 +732,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
}
}
}
}
m
Trace
(
"trans:-1, sync reconfig will be proposed"
);
m
Info
(
"trans:-1, sync reconfig will be proposed"
);
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
standby
=
0
;
pMgmt
->
standby
=
0
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
fa0f6b50
...
@@ -50,7 +50,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
...
@@ -50,7 +50,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
int32_t
transId
=
sdbGetIdFromRaw
(
pMnode
->
pSdb
,
pRaw
);
int32_t
transId
=
sdbGetIdFromRaw
(
pMnode
->
pSdb
,
pRaw
);
pMgmt
->
errCode
=
cbMeta
.
code
;
pMgmt
->
errCode
=
cbMeta
.
code
;
m
Debug
(
"trans:%d, is proposed, saved:%d code:0x%x, apply index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
m
Info
(
"trans:%d, is proposed, saved:%d code:0x%x, apply index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
" role:%s raw:%p"
,
" role:%s raw:%p"
,
transId
,
pMgmt
->
transId
,
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
,
cbMeta
.
lastConfigIndex
,
syncStr
(
cbMeta
.
state
),
transId
,
pMgmt
->
transId
,
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
,
cbMeta
.
lastConfigIndex
,
syncStr
(
cbMeta
.
state
),
pRaw
);
pRaw
);
...
@@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
...
@@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if
(
pMgmt
->
errCode
!=
0
)
{
if
(
pMgmt
->
errCode
!=
0
)
{
mError
(
"trans:%d, failed to propose since %s, post sem"
,
transId
,
tstrerror
(
pMgmt
->
errCode
));
mError
(
"trans:%d, failed to propose since %s, post sem"
,
transId
,
tstrerror
(
pMgmt
->
errCode
));
}
else
{
}
else
{
m
Debug
(
"trans:%d, is proposed and post sem"
,
transId
,
tstrerror
(
pMgmt
->
errCode
));
m
Info
(
"trans:%d, is proposed and post sem"
,
transId
,
tstrerror
(
pMgmt
->
errCode
));
}
}
pMgmt
->
transId
=
0
;
pMgmt
->
transId
=
0
;
taosWUnLockLatch
(
&
pMgmt
->
lock
);
taosWUnLockLatch
(
&
pMgmt
->
lock
);
...
@@ -88,7 +88,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
...
@@ -88,7 +88,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
}
}
int32_t
mndSyncGetSnapshot
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
,
void
*
pReaderParam
,
void
**
ppReader
)
{
int32_t
mndSyncGetSnapshot
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
,
void
*
pReaderParam
,
void
**
ppReader
)
{
m
Debug
(
"start to read snapshot from sdb in atomic way"
);
m
Info
(
"start to read snapshot from sdb in atomic way"
);
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
return
sdbStartRead
(
pMnode
->
pSdb
,
(
SSdbIter
**
)
ppReader
,
&
pSnapshot
->
lastApplyIndex
,
&
pSnapshot
->
lastApplyTerm
,
return
sdbStartRead
(
pMnode
->
pSdb
,
(
SSdbIter
**
)
ppReader
,
&
pSnapshot
->
lastApplyIndex
,
&
pSnapshot
->
lastApplyTerm
,
&
pSnapshot
->
lastConfigIndex
);
&
pSnapshot
->
lastConfigIndex
);
...
@@ -118,7 +118,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
...
@@ -118,7 +118,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
errCode
=
cbMeta
.
code
;
pMgmt
->
errCode
=
cbMeta
.
code
;
m
Debug
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
,
pMgmt
->
transId
,
m
Info
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
,
pMgmt
->
transId
,
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
);
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
);
taosWLockLatch
(
&
pMgmt
->
lock
);
taosWLockLatch
(
&
pMgmt
->
lock
);
...
@@ -126,7 +126,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
...
@@ -126,7 +126,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
if
(
pMgmt
->
errCode
!=
0
)
{
if
(
pMgmt
->
errCode
!=
0
)
{
mError
(
"trans:-1, failed to propose sync reconfig since %s, post sem"
,
tstrerror
(
pMgmt
->
errCode
));
mError
(
"trans:-1, failed to propose sync reconfig since %s, post sem"
,
tstrerror
(
pMgmt
->
errCode
));
}
else
{
}
else
{
m
Debug
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
" post sem"
,
m
Info
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
" post sem"
,
pMgmt
->
transId
,
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
);
pMgmt
->
transId
,
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
);
}
}
pMgmt
->
transId
=
0
;
pMgmt
->
transId
=
0
;
...
@@ -136,13 +136,13 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
...
@@ -136,13 +136,13 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
}
}
int32_t
mndSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
int32_t
mndSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
m
Debug
(
"start to read snapshot from sdb"
);
m
Info
(
"start to read snapshot from sdb"
);
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
return
sdbStartRead
(
pMnode
->
pSdb
,
(
SSdbIter
**
)
ppReader
,
NULL
,
NULL
,
NULL
);
return
sdbStartRead
(
pMnode
->
pSdb
,
(
SSdbIter
**
)
ppReader
,
NULL
,
NULL
,
NULL
);
}
}
int32_t
mndSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
int32_t
mndSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
m
Debug
(
"stop to read snapshot from sdb"
);
m
Info
(
"stop to read snapshot from sdb"
);
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
return
sdbStopRead
(
pMnode
->
pSdb
,
pReader
);
return
sdbStopRead
(
pMnode
->
pSdb
,
pReader
);
}
}
...
@@ -174,12 +174,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int
...
@@ -174,12 +174,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int
void
mndLeaderTransfer
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
mndLeaderTransfer
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
atomic_store_8
(
&
(
pMnode
->
syncMgmt
.
leaderTransferFinish
),
1
);
atomic_store_8
(
&
(
pMnode
->
syncMgmt
.
leaderTransferFinish
),
1
);
m
Debug
(
"vgId:1, mnode leader transfer finish"
);
m
Info
(
"vgId:1, mnode leader transfer finish"
);
}
}
static
void
mndBecomeFollower
(
struct
SSyncFSM
*
pFsm
)
{
static
void
mndBecomeFollower
(
struct
SSyncFSM
*
pFsm
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
m
Debug
(
"vgId:1, become follower and post sem"
);
m
Info
(
"vgId:1, become follower and post sem"
);
taosWLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
taosWLockLatch
(
&
pMnode
->
syncMgmt
.
lock
);
if
(
pMnode
->
syncMgmt
.
transId
!=
0
)
{
if
(
pMnode
->
syncMgmt
.
transId
!=
0
)
{
...
@@ -190,7 +190,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
...
@@ -190,7 +190,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
}
}
static
void
mndBecomeLeader
(
struct
SSyncFSM
*
pFsm
)
{
static
void
mndBecomeLeader
(
struct
SSyncFSM
*
pFsm
)
{
m
Debug
(
"vgId:1, become leader"
);
m
Info
(
"vgId:1, become leader"
);
SMnode
*
pMnode
=
pFsm
->
data
;
SMnode
*
pMnode
=
pFsm
->
data
;
}
}
...
@@ -228,7 +228,7 @@ int32_t mndInitSync(SMnode *pMnode) {
...
@@ -228,7 +228,7 @@ int32_t mndInitSync(SMnode *pMnode) {
syncInfo
.
isStandBy
=
pMgmt
->
standby
;
syncInfo
.
isStandBy
=
pMgmt
->
standby
;
syncInfo
.
snapshotStrategy
=
SYNC_STRATEGY_STANDARD_SNAPSHOT
;
syncInfo
.
snapshotStrategy
=
SYNC_STRATEGY_STANDARD_SNAPSHOT
;
m
Debug
(
"start to open mnode sync, standby:%d"
,
pMgmt
->
standby
);
m
Info
(
"start to open mnode sync, standby:%d"
,
pMgmt
->
standby
);
if
(
pMgmt
->
standby
||
pMgmt
->
replica
.
id
>
0
)
{
if
(
pMgmt
->
standby
||
pMgmt
->
replica
.
id
>
0
)
{
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
replicaNum
=
1
;
pCfg
->
replicaNum
=
1
;
...
@@ -236,7 +236,7 @@ int32_t mndInitSync(SMnode *pMnode) {
...
@@ -236,7 +236,7 @@ int32_t mndInitSync(SMnode *pMnode) {
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
0
];
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
0
];
tstrncpy
(
pNode
->
nodeFqdn
,
pMgmt
->
replica
.
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
tstrncpy
(
pNode
->
nodeFqdn
,
pMgmt
->
replica
.
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pMgmt
->
replica
.
port
;
pNode
->
nodePort
=
pMgmt
->
replica
.
port
;
m
Debug
(
"mnode ep:%s:%u"
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
m
Info
(
"mnode ep:%s:%u"
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
}
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
...
@@ -284,7 +284,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
...
@@ -284,7 +284,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
return
-
1
;
return
-
1
;
}
else
{
}
else
{
pMgmt
->
transId
=
transId
;
pMgmt
->
transId
=
transId
;
m
Debug
(
"trans:%d, will be proposed"
,
pMgmt
->
transId
);
m
Info
(
"trans:%d, will be proposed"
,
pMgmt
->
transId
);
taosWUnLockLatch
(
&
pMgmt
->
lock
);
taosWUnLockLatch
(
&
pMgmt
->
lock
);
}
}
...
@@ -314,7 +314,7 @@ void mndSyncStart(SMnode *pMnode) {
...
@@ -314,7 +314,7 @@ void mndSyncStart(SMnode *pMnode) {
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
syncSetMsgCb
(
pMgmt
->
sync
,
&
pMnode
->
msgCb
);
syncSetMsgCb
(
pMgmt
->
sync
,
&
pMnode
->
msgCb
);
syncStart
(
pMgmt
->
sync
);
syncStart
(
pMgmt
->
sync
);
m
Debug
(
"mnode sync started, id:%"
PRId64
" standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
m
Info
(
"mnode sync started, id:%"
PRId64
" standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
}
}
void
mndSyncStop
(
SMnode
*
pMnode
)
{
void
mndSyncStop
(
SMnode
*
pMnode
)
{
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
fa0f6b50
...
@@ -456,11 +456,11 @@ static const char *mndTransStr(ETrnStage stage) {
...
@@ -456,11 +456,11 @@ static const char *mndTransStr(ETrnStage stage) {
}
}
static
void
mndTransTestStartFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
static
void
mndTransTestStartFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
m
Debug
(
"test trans start, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
m
Info
(
"test trans start, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
}
}
static
void
mndTransTestStopFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
static
void
mndTransTestStopFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
m
Debug
(
"test trans stop, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
m
Info
(
"test trans stop, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
}
}
static
TransCbFp
mndTransGetCbFp
(
ETrnFunc
ftype
)
{
static
TransCbFp
mndTransGetCbFp
(
ETrnFunc
ftype
)
{
...
@@ -707,7 +707,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
...
@@ -707,7 +707,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
if
(
pTrans
->
oper
==
oper
)
{
if
(
pTrans
->
oper
==
oper
)
{
if
(
strcasecmp
(
dbname
,
pTrans
->
dbname1
)
==
0
)
{
if
(
strcasecmp
(
dbname
,
pTrans
->
dbname1
)
==
0
)
{
m
Debug
(
"trans:%d, db:%s oper:%d matched with input"
,
pTrans
->
id
,
dbname
,
oper
);
m
Info
(
"trans:%d, db:%s oper:%d matched with input"
,
pTrans
->
id
,
dbname
,
oper
);
if
(
pTrans
->
pRpcArray
==
NULL
)
{
if
(
pTrans
->
pRpcArray
==
NULL
)
{
pTrans
->
pRpcArray
=
taosArrayInit
(
1
,
sizeof
(
SRpcHandleInfo
));
pTrans
->
pRpcArray
=
taosArrayInit
(
1
,
sizeof
(
SRpcHandleInfo
));
}
}
...
@@ -746,7 +746,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
...
@@ -746,7 +746,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
m
Debug
(
"trans:%d, sync to other mnodes, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
));
m
Info
(
"trans:%d, sync to other mnodes, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
));
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
,
pTrans
->
id
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -755,7 +755,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
...
@@ -755,7 +755,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
}
sdbFreeRaw
(
pRaw
);
sdbFreeRaw
(
pRaw
);
m
Debug
(
"trans:%d, sync finished"
,
pTrans
->
id
);
m
Info
(
"trans:%d, sync finished"
,
pTrans
->
id
);
return
0
;
return
0
;
}
}
...
@@ -821,12 +821,12 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
...
@@ -821,12 +821,12 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return
-
1
;
return
-
1
;
}
}
m
Debug
(
"trans:%d, prepare transaction"
,
pTrans
->
id
);
m
Info
(
"trans:%d, prepare transaction"
,
pTrans
->
id
);
if
(
mndTransSync
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransSync
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
m
Debug
(
"trans:%d, prepare finished"
,
pTrans
->
id
);
m
Info
(
"trans:%d, prepare finished"
,
pTrans
->
id
);
STrans
*
pNew
=
mndAcquireTrans
(
pMnode
,
pTrans
->
id
);
STrans
*
pNew
=
mndAcquireTrans
(
pMnode
,
pTrans
->
id
);
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
...
@@ -847,22 +847,22 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
...
@@ -847,22 +847,22 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
}
}
static
int32_t
mndTransCommit
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransCommit
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
m
Debug
(
"trans:%d, commit transaction"
,
pTrans
->
id
);
m
Info
(
"trans:%d, commit transaction"
,
pTrans
->
id
);
if
(
mndTransSync
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransSync
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to commit since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to commit since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
m
Debug
(
"trans:%d, commit finished"
,
pTrans
->
id
);
m
Info
(
"trans:%d, commit finished"
,
pTrans
->
id
);
return
0
;
return
0
;
}
}
static
int32_t
mndTransRollback
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransRollback
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
m
Debug
(
"trans:%d, rollback transaction"
,
pTrans
->
id
);
m
Info
(
"trans:%d, rollback transaction"
,
pTrans
->
id
);
if
(
mndTransSync
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransSync
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to rollback since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to rollback since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
m
Debug
(
"trans:%d, rollback finished"
,
pTrans
->
id
);
m
Info
(
"trans:%d, rollback finished"
,
pTrans
->
id
);
return
0
;
return
0
;
}
}
...
@@ -894,7 +894,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
...
@@ -894,7 +894,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SRpcHandleInfo
*
pInfo
=
taosArrayGet
(
pTrans
->
pRpcArray
,
i
);
SRpcHandleInfo
*
pInfo
=
taosArrayGet
(
pTrans
->
pRpcArray
,
i
);
if
(
pInfo
->
handle
!=
NULL
)
{
if
(
pInfo
->
handle
!=
NULL
)
{
m
Debug
(
"trans:%d, send rsp, code:0x%x stage:%s app:%p"
,
pTrans
->
id
,
code
,
mndTransStr
(
pTrans
->
stage
),
m
Info
(
"trans:%d, send rsp, code:0x%x stage:%s app:%p"
,
pTrans
->
id
,
code
,
mndTransStr
(
pTrans
->
stage
),
pInfo
->
ahandle
);
pInfo
->
ahandle
);
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
code
=
TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL
;
code
=
TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL
;
...
@@ -902,13 +902,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
...
@@ -902,13 +902,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
SRpcMsg
rspMsg
=
{.
code
=
code
,
.
info
=
*
pInfo
};
SRpcMsg
rspMsg
=
{.
code
=
code
,
.
info
=
*
pInfo
};
if
(
pTrans
->
originRpcType
==
TDMT_MND_CREATE_DB
)
{
if
(
pTrans
->
originRpcType
==
TDMT_MND_CREATE_DB
)
{
m
Debug
(
"trans:%d, origin msgtype:%s"
,
pTrans
->
id
,
TMSG_INFO
(
pTrans
->
originRpcType
));
m
Info
(
"trans:%d, origin msgtype:%s"
,
pTrans
->
id
,
TMSG_INFO
(
pTrans
->
originRpcType
));
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pTrans
->
dbname1
);
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pTrans
->
dbname1
);
if
(
pDb
!=
NULL
)
{
if
(
pDb
!=
NULL
)
{
for
(
int32_t
j
=
0
;
j
<
12
;
j
++
)
{
for
(
int32_t
j
=
0
;
j
<
12
;
j
++
)
{
bool
ready
=
mndIsDbReady
(
pMnode
,
pDb
);
bool
ready
=
mndIsDbReady
(
pMnode
,
pDb
);
if
(
!
ready
)
{
if
(
!
ready
)
{
m
Debug
(
"trans:%d, db:%s not ready yet, wait %d times"
,
pTrans
->
id
,
pTrans
->
dbname1
,
j
);
m
Info
(
"trans:%d, db:%s not ready yet, wait %d times"
,
pTrans
->
id
,
pTrans
->
dbname1
,
j
);
taosMsleep
(
1000
);
taosMsleep
(
1000
);
}
else
{
}
else
{
break
;
break
;
...
@@ -978,7 +978,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
...
@@ -978,7 +978,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
pAction
->
errCode
=
pRsp
->
code
;
pAction
->
errCode
=
pRsp
->
code
;
}
}
m
Debug
(
"trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x"
,
transId
,
m
Info
(
"trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x"
,
transId
,
mndTransStr
(
pAction
->
stage
),
action
,
pRsp
->
code
,
pAction
->
acceptableCode
,
pAction
->
retryCode
);
mndTransStr
(
pAction
->
stage
),
action
,
pRsp
->
code
,
pAction
->
acceptableCode
,
pAction
->
retryCode
);
mndTransExecute
(
pMnode
,
pTrans
);
mndTransExecute
(
pMnode
,
pTrans
);
...
@@ -994,10 +994,10 @@ static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pA
...
@@ -994,10 +994,10 @@ static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pA
if
(
pAction
->
errCode
==
TSDB_CODE_RPC_REDIRECT
||
pAction
->
errCode
==
TSDB_CODE_SYN_NEW_CONFIG_ERROR
||
if
(
pAction
->
errCode
==
TSDB_CODE_RPC_REDIRECT
||
pAction
->
errCode
==
TSDB_CODE_SYN_NEW_CONFIG_ERROR
||
pAction
->
errCode
==
TSDB_CODE_SYN_INTERNAL_ERROR
||
pAction
->
errCode
==
TSDB_CODE_SYN_NOT_LEADER
)
{
pAction
->
errCode
==
TSDB_CODE_SYN_INTERNAL_ERROR
||
pAction
->
errCode
==
TSDB_CODE_SYN_NOT_LEADER
)
{
pAction
->
epSet
.
inUse
=
(
pAction
->
epSet
.
inUse
+
1
)
%
pAction
->
epSet
.
numOfEps
;
pAction
->
epSet
.
inUse
=
(
pAction
->
epSet
.
inUse
+
1
)
%
pAction
->
epSet
.
numOfEps
;
m
Debug
(
"trans:%d, %s:%d execute status is reset and set epset inuse:%d"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
m
Info
(
"trans:%d, %s:%d execute status is reset and set epset inuse:%d"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
pAction
->
epSet
.
inUse
);
pAction
->
id
,
pAction
->
epSet
.
inUse
);
}
else
{
}
else
{
m
Debug
(
"trans:%d, %s:%d execute status is reset"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
m
Info
(
"trans:%d, %s:%d execute status is reset"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
}
}
pAction
->
errCode
=
0
;
pAction
->
errCode
=
0
;
}
}
...
@@ -1024,7 +1024,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
...
@@ -1024,7 +1024,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
pAction
->
rawWritten
=
true
;
pAction
->
rawWritten
=
true
;
pAction
->
errCode
=
0
;
pAction
->
errCode
=
0
;
code
=
0
;
code
=
0
;
m
Debug
(
"trans:%d, %s:%d write to sdb, type:%s status:%s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
m
Info
(
"trans:%d, %s:%d write to sdb, type:%s status:%s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
sdbTableName
(
pAction
->
pRaw
->
type
),
sdbStatusName
(
pAction
->
pRaw
->
status
));
sdbTableName
(
pAction
->
pRaw
->
type
),
sdbStatusName
(
pAction
->
pRaw
->
status
));
pTrans
->
lastAction
=
pAction
->
id
;
pTrans
->
lastAction
=
pAction
->
id
;
...
@@ -1073,7 +1073,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
...
@@ -1073,7 +1073,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
pAction
->
msgSent
=
1
;
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
0
;
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
0
;
pAction
->
errCode
=
0
;
m
Debug
(
"trans:%d, %s:%d is sent, %s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
detail
);
m
Info
(
"trans:%d, %s:%d is sent, %s"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
detail
);
pTrans
->
lastAction
=
pAction
->
id
;
pTrans
->
lastAction
=
pAction
->
id
;
pTrans
->
lastMsgType
=
pAction
->
msgType
;
pTrans
->
lastMsgType
=
pAction
->
msgType
;
...
@@ -1100,7 +1100,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
...
@@ -1100,7 +1100,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
static
int32_t
mndTransExecNullMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STransAction
*
pAction
)
{
static
int32_t
mndTransExecNullMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STransAction
*
pAction
)
{
pAction
->
rawWritten
=
0
;
pAction
->
rawWritten
=
0
;
pAction
->
errCode
=
0
;
pAction
->
errCode
=
0
;
m
Debug
(
"trans:%d, %s:%d confirm action executed"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
m
Info
(
"trans:%d, %s:%d confirm action executed"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
pTrans
->
lastAction
=
pAction
->
id
;
pTrans
->
lastAction
=
pAction
->
id
;
pTrans
->
lastMsgType
=
pAction
->
msgType
;
pTrans
->
lastMsgType
=
pAction
->
msgType
;
...
@@ -1160,7 +1160,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
...
@@ -1160,7 +1160,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
pTrans
->
lastMsgType
=
0
;
pTrans
->
lastMsgType
=
0
;
memset
(
&
pTrans
->
lastEpset
,
0
,
sizeof
(
pTrans
->
lastEpset
));
memset
(
&
pTrans
->
lastEpset
,
0
,
sizeof
(
pTrans
->
lastEpset
));
pTrans
->
lastErrorNo
=
0
;
pTrans
->
lastErrorNo
=
0
;
m
Debug
(
"trans:%d, all %d actions execute successfully"
,
pTrans
->
id
,
numOfActions
);
m
Info
(
"trans:%d, all %d actions execute successfully"
,
pTrans
->
id
,
numOfActions
);
return
0
;
return
0
;
}
else
{
}
else
{
mError
(
"trans:%d, all %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfActions
,
errCode
&
0XFFFF
);
mError
(
"trans:%d, all %d actions executed, code:0x%x"
,
pTrans
->
id
,
numOfActions
,
errCode
&
0XFFFF
);
...
@@ -1175,7 +1175,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
...
@@ -1175,7 +1175,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return
errCode
;
return
errCode
;
}
}
}
else
{
}
else
{
m
Debug
(
"trans:%d, %d of %d actions executed"
,
pTrans
->
id
,
numOfExecuted
,
numOfActions
);
m
Info
(
"trans:%d, %d of %d actions executed"
,
pTrans
->
id
,
numOfExecuted
,
numOfActions
);
return
TSDB_CODE_ACTION_IN_PROGRESS
;
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
}
}
}
...
@@ -1221,7 +1221,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
...
@@ -1221,7 +1221,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
code
=
pAction
->
errCode
;
code
=
pAction
->
errCode
;
mndTransResetAction
(
pMnode
,
pTrans
,
pAction
);
mndTransResetAction
(
pMnode
,
pTrans
,
pAction
);
}
else
{
}
else
{
m
Debug
(
"trans:%d, %s:%d execute successfully"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
);
m
Info
(
"trans:%d, %s:%d execute successfully"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
);
}
}
}
else
{
}
else
{
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
...
@@ -1230,7 +1230,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
...
@@ -1230,7 +1230,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
if
(
pAction
->
errCode
!=
0
&&
pAction
->
errCode
!=
pAction
->
acceptableCode
)
{
if
(
pAction
->
errCode
!=
0
&&
pAction
->
errCode
!=
pAction
->
acceptableCode
)
{
code
=
pAction
->
errCode
;
code
=
pAction
->
errCode
;
}
else
{
}
else
{
m
Debug
(
"trans:%d, %s:%d write successfully"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
);
m
Info
(
"trans:%d, %s:%d write successfully"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
);
}
}
}
else
{
}
else
{
}
}
...
@@ -1254,7 +1254,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
...
@@ -1254,7 +1254,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
code
=
0
;
pTrans
->
code
=
0
;
pTrans
->
redoActionPos
++
;
pTrans
->
redoActionPos
++
;
m
Debug
(
"trans:%d, %s:%d is executed and need sync to other mnodes"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
m
Info
(
"trans:%d, %s:%d is executed and need sync to other mnodes"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
pAction
->
id
);
code
=
mndTransSync
(
pMnode
,
pTrans
);
code
=
mndTransSync
(
pMnode
,
pTrans
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
...
@@ -1263,17 +1263,17 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
...
@@ -1263,17 +1263,17 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
terrstr
());
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
terrstr
());
}
}
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
m
Debug
(
"trans:%d, %s:%d is in progress and wait it finish"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
m
Info
(
"trans:%d, %s:%d is in progress and wait it finish"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
break
;
break
;
}
else
if
(
code
==
pAction
->
retryCode
)
{
}
else
if
(
code
==
pAction
->
retryCode
)
{
m
Debug
(
"trans:%d, %s:%d receive code:0x%x and retry"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
code
);
m
Info
(
"trans:%d, %s:%d receive code:0x%x and retry"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
code
);
taosMsleep
(
300
);
taosMsleep
(
300
);
action
--
;
action
--
;
continue
;
continue
;
}
else
{
}
else
{
terrno
=
code
;
terrno
=
code
;
pTrans
->
code
=
code
;
pTrans
->
code
=
code
;
m
Debug
(
"trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d"
,
pTrans
->
id
,
m
Info
(
"trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
code
,
pTrans
->
failedTimes
);
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
code
,
pTrans
->
failedTimes
);
break
;
break
;
}
}
...
@@ -1285,7 +1285,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
...
@@ -1285,7 +1285,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
static
bool
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
bool
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
bool
continueExec
=
true
;
bool
continueExec
=
true
;
pTrans
->
stage
=
TRN_STAGE_REDO_ACTION
;
pTrans
->
stage
=
TRN_STAGE_REDO_ACTION
;
m
Debug
(
"trans:%d, stage from prepare to redoAction"
,
pTrans
->
id
);
m
Info
(
"trans:%d, stage from prepare to redoAction"
,
pTrans
->
id
);
return
continueExec
;
return
continueExec
;
}
}
...
@@ -1304,10 +1304,10 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1304,10 +1304,10 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
code
=
0
;
pTrans
->
code
=
0
;
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
m
Debug
(
"trans:%d, stage from redoAction to commit"
,
pTrans
->
id
);
m
Info
(
"trans:%d, stage from redoAction to commit"
,
pTrans
->
id
);
continueExec
=
true
;
continueExec
=
true
;
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
m
Debug
(
"trans:%d, stage keep on redoAction since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
m
Info
(
"trans:%d, stage keep on redoAction since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
continueExec
=
false
;
continueExec
=
false
;
}
else
{
}
else
{
pTrans
->
failedTimes
++
;
pTrans
->
failedTimes
++
;
...
@@ -1347,7 +1347,7 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1347,7 +1347,7 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
code
=
0
;
pTrans
->
code
=
0
;
pTrans
->
stage
=
TRN_STAGE_COMMIT_ACTION
;
pTrans
->
stage
=
TRN_STAGE_COMMIT_ACTION
;
m
Debug
(
"trans:%d, stage from commit to commitAction"
,
pTrans
->
id
);
m
Info
(
"trans:%d, stage from commit to commitAction"
,
pTrans
->
id
);
continueExec
=
true
;
continueExec
=
true
;
}
else
{
}
else
{
pTrans
->
code
=
terrno
;
pTrans
->
code
=
terrno
;
...
@@ -1366,7 +1366,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1366,7 +1366,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
code
=
0
;
pTrans
->
code
=
0
;
pTrans
->
stage
=
TRN_STAGE_FINISHED
;
pTrans
->
stage
=
TRN_STAGE_FINISHED
;
m
Debug
(
"trans:%d, stage from commitAction to finished"
,
pTrans
->
id
);
m
Info
(
"trans:%d, stage from commitAction to finished"
,
pTrans
->
id
);
continueExec
=
true
;
continueExec
=
true
;
}
else
{
}
else
{
pTrans
->
code
=
terrno
;
pTrans
->
code
=
terrno
;
...
@@ -1384,10 +1384,10 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1384,10 +1384,10 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_FINISHED
;
pTrans
->
stage
=
TRN_STAGE_FINISHED
;
m
Debug
(
"trans:%d, stage from undoAction to finished"
,
pTrans
->
id
);
m
Info
(
"trans:%d, stage from undoAction to finished"
,
pTrans
->
id
);
continueExec
=
true
;
continueExec
=
true
;
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
m
Debug
(
"trans:%d, stage keep on undoAction since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
m
Info
(
"trans:%d, stage keep on undoAction since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
continueExec
=
false
;
continueExec
=
false
;
}
else
{
}
else
{
pTrans
->
failedTimes
++
;
pTrans
->
failedTimes
++
;
...
@@ -1406,7 +1406,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1406,7 +1406,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pTrans
->
stage
=
TRN_STAGE_UNDO_ACTION
;
pTrans
->
stage
=
TRN_STAGE_UNDO_ACTION
;
m
Debug
(
"trans:%d, stage from rollback to undoAction"
,
pTrans
->
id
);
m
Info
(
"trans:%d, stage from rollback to undoAction"
,
pTrans
->
id
);
continueExec
=
true
;
continueExec
=
true
;
}
else
{
}
else
{
pTrans
->
failedTimes
++
;
pTrans
->
failedTimes
++
;
...
@@ -1431,7 +1431,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1431,7 +1431,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
}
}
m
Debug
(
"trans:%d, execute finished, code:0x%x, failedTimes:%d"
,
pTrans
->
id
,
pTrans
->
code
,
pTrans
->
failedTimes
);
m
Info
(
"trans:%d, execute finished, code:0x%x, failedTimes:%d"
,
pTrans
->
id
,
pTrans
->
code
,
pTrans
->
failedTimes
);
return
continueExec
;
return
continueExec
;
}
}
...
@@ -1439,7 +1439,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
...
@@ -1439,7 +1439,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
bool
continueExec
=
true
;
bool
continueExec
=
true
;
while
(
continueExec
)
{
while
(
continueExec
)
{
m
Debug
(
"trans:%d, continue to execute, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
));
m
Info
(
"trans:%d, continue to execute, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
));
pTrans
->
lastExecTime
=
taosGetTimestampMs
();
pTrans
->
lastExecTime
=
taosGetTimestampMs
();
switch
(
pTrans
->
stage
)
{
switch
(
pTrans
->
stage
)
{
case
TRN_STAGE_PREPARE
:
case
TRN_STAGE_PREPARE
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录