Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1013b061
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
1013b061
编写于
5月 31, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: make trans support global conflict
上级
9057abeb
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
14 addition
and
15 deletion
+14
-15
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-1
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+3
-3
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+1
-1
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+6
-7
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+2
-2
未找到文件。
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
1013b061
...
@@ -122,7 +122,7 @@ typedef struct {
...
@@ -122,7 +122,7 @@ typedef struct {
int64_t
dbUid
;
int64_t
dbUid
;
char
dbname
[
TSDB_DB_FNAME_LEN
];
char
dbname
[
TSDB_DB_FNAME_LEN
];
char
lastError
[
TSDB_TRANS_ERROR_LEN
];
char
lastError
[
TSDB_TRANS_ERROR_LEN
];
char
desc
[
TSDB_TRANS_DESC_LEN
];
char
o
[
TSDB_TRANS_DESC_LEN
];
int32_t
startFunc
;
int32_t
startFunc
;
int32_t
stopFunc
;
int32_t
stopFunc
;
int32_t
paramLen
;
int32_t
paramLen
;
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
1013b061
...
@@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
...
@@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_
GLOBAL
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_
NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
...
@@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
...
@@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_
GLOBAL
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_
NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
...
@@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
...
@@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t
newTopicNum
=
taosArrayGetSize
(
newSub
);
int32_t
newTopicNum
=
taosArrayGetSize
(
newSub
);
// check topic existance
// check topic existance
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_
GLOBAL
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_
NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
SUBSCRIBE_OVER
;
if
(
pTrans
==
NULL
)
goto
SUBSCRIBE_OVER
;
for
(
int32_t
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
...
...
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
1013b061
...
@@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
...
@@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq
(
&
decoder
,
&
commitOffsetReq
);
tDecodeSMqCMCommitOffsetReq
(
&
decoder
,
&
commitOffsetReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
GLOBAL
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
NOTHING
,
pMsg
);
for
(
int32_t
i
=
0
;
i
<
commitOffsetReq
.
num
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
commitOffsetReq
.
num
;
i
++
)
{
SMqOffset
*
pOffset
=
&
commitOffsetReq
.
offsets
[
i
];
SMqOffset
*
pOffset
=
&
commitOffsetReq
.
offsets
[
i
];
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
1013b061
...
@@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
...
@@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
tstrncpy
(
streamObj
.
targetDb
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
tstrncpy
(
streamObj
.
targetDb
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
}
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
GLOBAL
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"stream:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"stream:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
1013b061
...
@@ -393,8 +393,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
...
@@ -393,8 +393,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo
(
"rebalance calculation completed, rebalanced vg:"
);
mInfo
(
"rebalance calculation completed, rebalanced vg:"
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
rebVgs
);
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
rebVgs
);
i
++
)
{
SMqRebOutputVg
*
pOutputRebVg
=
taosArrayGet
(
pOutput
->
rebVgs
,
i
);
SMqRebOutputVg
*
pOutputRebVg
=
taosArrayGet
(
pOutput
->
rebVgs
,
i
);
mInfo
(
"vg
: %d moved from consumer %ld to consumer %ld"
,
pOutputRebVg
->
pVgEp
->
vgId
,
pOutputRebVg
->
oldConsumer
Id
,
mInfo
(
"vg
Id:%d moved from consumer %"
PRId64
" to consumer %"
PRId64
,
pOutputRebVg
->
pVgEp
->
vg
Id
,
pOutputRebVg
->
newConsumerId
);
pOutputRebVg
->
oldConsumerId
,
pOutputRebVg
->
newConsumerId
);
}
}
// 9. clear
// 9. clear
...
@@ -404,10 +404,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
...
@@ -404,10 +404,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
static
int32_t
mndPersistRebResult
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
,
const
SMqRebOutputObj
*
pOutput
)
{
static
int32_t
mndPersistRebResult
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
,
const
SMqRebOutputObj
*
pOutput
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_GLOBAL
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
return
-
1
;
return
-
1
;
}
// make txn:
// make txn:
// 1. redo action: action to all vg
// 1. redo action: action to all vg
const
SArray
*
rebVgs
=
pOutput
->
rebVgs
;
const
SArray
*
rebVgs
=
pOutput
->
rebVgs
;
...
@@ -623,7 +622,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
...
@@ -623,7 +622,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
return
-
1
;
return
-
1
;
}
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
GLOBAL
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"cgroup: %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
mError
(
"cgroup: %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
1013b061
...
@@ -377,7 +377,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
...
@@ -377,7 +377,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj
.
withSchema
=
1
;
topicObj
.
withSchema
=
1
;
}
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
GLOBAL
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
taosMemoryFreeClear
(
topicObj
.
ast
);
taosMemoryFreeClear
(
topicObj
.
ast
);
...
@@ -545,7 +545,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
...
@@ -545,7 +545,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
}
#endif
#endif
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
GLOBAL
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_
NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录