Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
30104f30
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
30104f30
编写于
5月 16, 2023
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: refactor func names of mndTransPerformFinishedStage and mndTrancCheckConflict
上级
3fa963a5
变更
11
显示空白变更内容
内联
并排
Showing
11 changed file
with
35 addition
and
33 deletion
+35
-33
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+4
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
source/dnode/mnode/impl/src/mndIndex.c
source/dnode/mnode/impl/src/mndIndex.c
+3
-3
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+2
-2
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+2
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+5
-5
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+5
-5
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-1
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+1
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+7
-7
未找到文件。
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
30104f30
...
@@ -78,8 +78,10 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam
...
@@ -78,8 +78,10 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam
void
mndTransSetSerial
(
STrans
*
pTrans
);
void
mndTransSetSerial
(
STrans
*
pTrans
);
void
mndTransSetParallel
(
STrans
*
pTrans
);
void
mndTransSetParallel
(
STrans
*
pTrans
);
void
mndTransSetOper
(
STrans
*
pTrans
,
EOperType
oper
);
void
mndTransSetOper
(
STrans
*
pTrans
,
EOperType
oper
);
int32_t
mndTrancCheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransCheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTrancCheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
return
mndTransCheckConflict
(
pMnode
,
pTrans
);
}
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransProcessRsp
(
SRpcMsg
*
pRsp
);
int32_t
mndTransProcessRsp
(
SRpcMsg
*
pRsp
);
void
mndTransPullup
(
SMnode
*
pMnode
);
void
mndTransPullup
(
SMnode
*
pMnode
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
30104f30
...
@@ -572,7 +572,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
...
@@ -572,7 +572,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mInfo
(
"trans:%d, used to create db:%s"
,
pTrans
->
id
,
pCreate
->
db
);
mInfo
(
"trans:%d, used to create db:%s"
,
pTrans
->
id
,
pCreate
->
db
);
mndTransSetDbName
(
pTrans
,
dbObj
.
name
,
NULL
);
mndTransSetDbName
(
pTrans
,
dbObj
.
name
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mndTransSetOper
(
pTrans
,
MND_OPER_CREATE_DB
);
mndTransSetOper
(
pTrans
,
MND_OPER_CREATE_DB
);
if
(
mndSetCreateDbRedoLogs
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbRedoLogs
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
...
@@ -814,7 +814,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
...
@@ -814,7 +814,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
mndTransSetDbName
(
pTrans
,
pOld
->
name
,
NULL
);
mndTransSetDbName
(
pTrans
,
pOld
->
name
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbRedoLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbRedoLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbCommitLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbCommitLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
...
@@ -1109,7 +1109,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
...
@@ -1109,7 +1109,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mInfo
(
"trans:%d start to drop db:%s"
,
pTrans
->
id
,
pDb
->
name
);
mInfo
(
"trans:%d start to drop db:%s"
,
pTrans
->
id
,
pDb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
goto
_OVER
;
goto
_OVER
;
}
}
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
30104f30
...
@@ -632,7 +632,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
...
@@ -632,7 +632,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_GLOBAL
,
pReq
,
"create-dnode"
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_GLOBAL
,
pReq
,
"create-dnode"
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mInfo
(
"trans:%d, used to create dnode:%s"
,
pTrans
->
id
,
dnodeObj
.
ep
);
mInfo
(
"trans:%d, used to create dnode:%s"
,
pTrans
->
id
,
dnodeObj
.
ep
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
pRaw
=
mndDnodeActionEncode
(
&
dnodeObj
);
pRaw
=
mndDnodeActionEncode
(
&
dnodeObj
);
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
goto
_OVER
;
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
goto
_OVER
;
...
@@ -889,7 +889,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
...
@@ -889,7 +889,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
mInfo
(
"trans:%d, used to drop dnode:%d, force:%d"
,
pTrans
->
id
,
pDnode
->
id
,
force
);
mInfo
(
"trans:%d, used to drop dnode:%d, force:%d"
,
pTrans
->
id
,
pDnode
->
id
,
force
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
pRaw
=
mndDnodeActionEncode
(
pDnode
);
pRaw
=
mndDnodeActionEncode
(
pDnode
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
if
(
pRaw
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndIndex.c
浏览文件 @
30104f30
...
@@ -645,7 +645,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
...
@@ -645,7 +645,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
// mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
// mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
...
@@ -721,7 +721,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
...
@@ -721,7 +721,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
mInfo
(
"trans:%d, used to drop idx:%s"
,
pTrans
->
id
,
pIdx
->
name
);
mInfo
(
"trans:%d, used to drop idx:%s"
,
pTrans
->
id
,
pIdx
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
if
(
mndSetDropIdxRedoLogs
(
pMnode
,
pTrans
,
pIdx
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropIdxRedoLogs
(
pMnode
,
pTrans
,
pIdx
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
30104f30
...
@@ -578,7 +578,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
...
@@ -578,7 +578,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
mInfo
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
mInfo
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
SMnodeObj
mnodeObj
=
{
0
};
SMnodeObj
mnodeObj
=
{
0
};
mnodeObj
.
id
=
pDnode
->
id
;
mnodeObj
.
id
=
pDnode
->
id
;
...
@@ -732,7 +732,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
...
@@ -732,7 +732,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
mInfo
(
"trans:%d, used to drop mnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
mInfo
(
"trans:%d, used to drop mnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeInfoToTrans
(
pMnode
,
pTrans
,
pObj
,
false
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeInfoToTrans
(
pMnode
,
pTrans
,
pObj
,
false
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
30104f30
...
@@ -622,7 +622,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
...
@@ -622,7 +622,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB
,
pReq
,
"create-sma"
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB
,
pReq
,
"create-sma"
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
mInfo
(
"trans:%d, used to create sma:%s stream:%s"
,
pTrans
->
id
,
pCreate
->
name
,
streamObj
.
name
);
mInfo
(
"trans:%d, used to create sma:%s stream:%s"
,
pTrans
->
id
,
pCreate
->
name
,
streamObj
.
name
);
...
@@ -845,7 +845,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
...
@@ -845,7 +845,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
mInfo
(
"trans:%d, used to drop sma:%s"
,
pTrans
->
id
,
pSma
->
name
);
mInfo
(
"trans:%d, used to drop sma:%s"
,
pTrans
->
id
,
pSma
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mndTransSetSerial
(
pTrans
);
mndTransSetSerial
(
pTrans
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
30104f30
...
@@ -874,7 +874,7 @@ _OVER:
...
@@ -874,7 +874,7 @@ _OVER:
int32_t
mndAddStbToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
int32_t
mndAddStbToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
return
-
1
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
return
-
1
;
if
(
mndSetCreateStbRedoLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
return
-
1
;
if
(
mndSetCreateStbRedoLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
return
-
1
;
if
(
mndSetCreateStbUndoLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
return
-
1
;
if
(
mndSetCreateStbUndoLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
return
-
1
;
if
(
mndSetCreateStbCommitLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
return
-
1
;
if
(
mndSetCreateStbCommitLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
return
-
1
;
...
@@ -1968,7 +1968,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
...
@@ -1968,7 +1968,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
mInfo
(
"trans:%d, used to alter stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
mInfo
(
"trans:%d, used to alter stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
needRsp
)
{
if
(
needRsp
)
{
void
*
pCont
=
NULL
;
void
*
pCont
=
NULL
;
...
@@ -1998,7 +1998,7 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
...
@@ -1998,7 +1998,7 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
mInfo
(
"trans:%d, used to alter stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
mInfo
(
"trans:%d, used to alter stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
needRsp
)
{
if
(
needRsp
)
{
void
*
pCont
=
NULL
;
void
*
pCont
=
NULL
;
...
@@ -2242,7 +2242,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
...
@@ -2242,7 +2242,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
mInfo
(
"trans:%d, used to drop stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
mInfo
(
"trans:%d, used to drop stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
pStb
->
name
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropStbRedoLogs
(
pMnode
,
pTrans
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropStbRedoLogs
(
pMnode
,
pTrans
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropStbCommitLogs
(
pMnode
,
pTrans
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropStbCommitLogs
(
pMnode
,
pTrans
,
pStb
)
!=
0
)
goto
_OVER
;
...
@@ -3298,7 +3298,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
...
@@ -3298,7 +3298,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTran
c
CheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTran
s
CheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
30104f30
...
@@ -733,7 +733,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
...
@@ -733,7 +733,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mInfo
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
createStreamReq
.
name
);
mInfo
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
createStreamReq
.
name
);
mndTransSetDbName
(
pTrans
,
createStreamReq
.
sourceDB
,
streamObj
.
targetDb
);
mndTransSetDbName
(
pTrans
,
createStreamReq
.
sourceDB
,
streamObj
.
targetDb
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
goto
_OVER
;
goto
_OVER
;
}
}
...
@@ -888,7 +888,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
...
@@ -888,7 +888,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
if (pTrans == NULL) return -1;
if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTran
c
CheckConflict(pMnode, pTrans) != 0) {
if (mndTran
s
CheckConflict(pMnode, pTrans) != 0) {
mndReleaseStream(pMnode, pStream);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
mndTransDrop(pTrans);
return -1;
return -1;
...
@@ -999,7 +999,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
...
@@ -999,7 +999,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mInfo
(
"trans:%d, used to drop stream:%s"
,
pTrans
->
id
,
dropReq
.
name
);
mInfo
(
"trans:%d, used to drop stream:%s"
,
pTrans
->
id
,
dropReq
.
name
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -1367,7 +1367,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
...
@@ -1367,7 +1367,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
mInfo
(
"trans:%d, used to pause stream:%s"
,
pTrans
->
id
,
pauseReq
.
name
);
mInfo
(
"trans:%d, used to pause stream:%s"
,
pTrans
->
id
,
pauseReq
.
name
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
@@ -1475,7 +1475,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
...
@@ -1475,7 +1475,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
mInfo
(
"trans:%d, used to pause stream:%s"
,
pTrans
->
id
,
pauseReq
.
name
);
mInfo
(
"trans:%d, used to pause stream:%s"
,
pTrans
->
id
,
pauseReq
.
name
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
30104f30
...
@@ -476,7 +476,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
...
@@ -476,7 +476,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
}
}
mndTransSetDbName
(
pTrans
,
pOutput
->
pSub
->
dbName
,
NULL
);
mndTransSetDbName
(
pTrans
,
pOutput
->
pSub
->
dbName
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
30104f30
...
@@ -740,7 +740,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
...
@@ -740,7 +740,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
}
mndTransSetDbName
(
pTrans
,
pTopic
->
db
,
NULL
);
mndTransSetDbName
(
pTrans
,
pTopic
->
db
,
NULL
);
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
mndReleaseTopic
(
pMnode
,
pTopic
);
mndReleaseTopic
(
pMnode
,
pTopic
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
30104f30
...
@@ -52,7 +52,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
...
@@ -52,7 +52,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static
bool
mndTransPerformCommitActionStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerformCommitActionStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerformCommitStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerformRollbackStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerf
ro
mFinishedStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndTransPerf
or
mFinishedStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
bool
mndCannotExecuteTransAction
(
SMnode
*
pMnode
)
{
return
!
pMnode
->
deploy
&&
!
mndIsLeader
(
pMnode
);
}
static
bool
mndCannotExecuteTransAction
(
SMnode
*
pMnode
)
{
return
!
pMnode
->
deploy
&&
!
mndIsLeader
(
pMnode
);
}
static
void
mndTransSendRpcRsp
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
void
mndTransSendRpcRsp
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
...
@@ -872,7 +872,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
...
@@ -872,7 +872,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
return
conflict
;
return
conflict
;
}
}
int32_t
mndTran
c
CheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
mndTran
s
CheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB
||
pTrans
->
conflict
==
TRN_CONFLICT_DB_INSIDE
)
{
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB
||
pTrans
->
conflict
==
TRN_CONFLICT_DB_INSIDE
)
{
if
(
strlen
(
pTrans
->
dbname
)
==
0
&&
strlen
(
pTrans
->
stbname
)
==
0
)
{
if
(
strlen
(
pTrans
->
dbname
)
==
0
&&
strlen
(
pTrans
->
stbname
)
==
0
)
{
terrno
=
TSDB_CODE_MND_TRANS_CONFLICT
;
terrno
=
TSDB_CODE_MND_TRANS_CONFLICT
;
...
@@ -891,7 +891,7 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
...
@@ -891,7 +891,7 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
}
}
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
mndTran
c
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTran
s
CheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -1528,7 +1528,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1528,7 +1528,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
return
continueExec
;
return
continueExec
;
}
}
static
bool
mndTransPerf
romPreFinished
Stage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
bool
mndTransPerf
ormPreFinish
Stage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
mndCannotExecuteTransAction
(
pMnode
))
return
false
;
if
(
mndCannotExecuteTransAction
(
pMnode
))
return
false
;
bool
continueExec
=
true
;
bool
continueExec
=
true
;
...
@@ -1547,7 +1547,7 @@ static bool mndTransPerfromPreFinishedStage(SMnode *pMnode, STrans *pTrans) {
...
@@ -1547,7 +1547,7 @@ static bool mndTransPerfromPreFinishedStage(SMnode *pMnode, STrans *pTrans) {
return
continueExec
;
return
continueExec
;
}
}
static
bool
mndTransPerf
ro
mFinishedStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
bool
mndTransPerf
or
mFinishedStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
bool
continueExec
=
false
;
bool
continueExec
=
false
;
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
...
@@ -1605,14 +1605,14 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
...
@@ -1605,14 +1605,14 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
break
;
break
;
case
TRN_STAGE_PRE_FINISH
:
case
TRN_STAGE_PRE_FINISH
:
if
(
isLeader
)
{
if
(
isLeader
)
{
continueExec
=
mndTransPerf
romPreFinished
Stage
(
pMnode
,
pTrans
);
continueExec
=
mndTransPerf
ormPreFinish
Stage
(
pMnode
,
pTrans
);
}
else
{
}
else
{
mInfo
(
"trans:%d, can not pre-finish since not leader"
,
pTrans
->
id
);
mInfo
(
"trans:%d, can not pre-finish since not leader"
,
pTrans
->
id
);
continueExec
=
false
;
continueExec
=
false
;
}
}
break
;
break
;
case
TRN_STAGE_FINISHED
:
case
TRN_STAGE_FINISHED
:
continueExec
=
mndTransPerf
ro
mFinishedStage
(
pMnode
,
pTrans
);
continueExec
=
mndTransPerf
or
mFinishedStage
(
pMnode
,
pTrans
);
break
;
break
;
default:
default:
continueExec
=
false
;
continueExec
=
false
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录