Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
bafd6218
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
270
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
bafd6218
编写于
10月 15, 2019
作者:
H
huangli
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change log level, optimise performance of endTransaction and txCheckTask
上级
fdbd3cb8
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
4 addition
and
4 deletion
+4
-4
broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
...he/rocketmq/broker/processor/EndTransactionProcessor.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
...er/transaction/queue/TransactionalMessageServiceImpl.java
+3
-3
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
浏览文件 @
bafd6218
...
@@ -55,7 +55,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
...
@@ -55,7 +55,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
final
EndTransactionRequestHeader
requestHeader
=
final
EndTransactionRequestHeader
requestHeader
=
(
EndTransactionRequestHeader
)
request
.
decodeCommandCustomHeader
(
EndTransactionRequestHeader
.
class
);
(
EndTransactionRequestHeader
)
request
.
decodeCommandCustomHeader
(
EndTransactionRequestHeader
.
class
);
LOGGER
.
info
(
"Transaction request:{}"
,
requestHeader
);
LOGGER
.
debug
(
"Transaction request:{}"
,
requestHeader
);
if
(
BrokerRole
.
SLAVE
==
brokerController
.
getMessageStoreConfig
().
getBrokerRole
())
{
if
(
BrokerRole
.
SLAVE
==
brokerController
.
getMessageStoreConfig
().
getBrokerRole
())
{
response
.
setCode
(
ResponseCode
.
SLAVE_NOT_AVAILABLE
);
response
.
setCode
(
ResponseCode
.
SLAVE_NOT_AVAILABLE
);
LOGGER
.
warn
(
"Message store is slave mode, so end transaction is forbidden. "
);
LOGGER
.
warn
(
"Message store is slave mode, so end transaction is forbidden. "
);
...
...
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
浏览文件 @
bafd6218
...
@@ -224,7 +224,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
...
@@ -224,7 +224,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
listener
.
resolveHalfMsg
(
msgExt
);
listener
.
resolveHalfMsg
(
msgExt
);
}
else
{
}
else
{
pullResult
=
fillOpRemoveMap
(
removeMap
,
opQueue
,
pullResult
.
getNextBeginOffset
(),
halfOffset
,
doneOpOffset
);
pullResult
=
fillOpRemoveMap
(
removeMap
,
opQueue
,
pullResult
.
getNextBeginOffset
(),
halfOffset
,
doneOpOffset
);
log
.
info
(
"The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}"
,
i
,
log
.
debug
(
"The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}"
,
i
,
messageQueue
,
pullResult
);
messageQueue
,
pullResult
);
continue
;
continue
;
}
}
...
@@ -293,7 +293,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
...
@@ -293,7 +293,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
}
}
for
(
MessageExt
opMessageExt
:
opMsg
)
{
for
(
MessageExt
opMessageExt
:
opMsg
)
{
Long
queueOffset
=
getLong
(
new
String
(
opMessageExt
.
getBody
(),
TransactionalMessageUtil
.
charset
));
Long
queueOffset
=
getLong
(
new
String
(
opMessageExt
.
getBody
(),
TransactionalMessageUtil
.
charset
));
log
.
info
(
"Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}"
,
opMessageExt
.
getTopic
(),
log
.
debug
(
"Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}"
,
opMessageExt
.
getTopic
(),
opMessageExt
.
getTags
(),
opMessageExt
.
getQueueOffset
(),
queueOffset
);
opMessageExt
.
getTags
(),
opMessageExt
.
getQueueOffset
(),
queueOffset
);
if
(
TransactionalMessageUtil
.
REMOVETAG
.
equals
(
opMessageExt
.
getTags
()))
{
if
(
TransactionalMessageUtil
.
REMOVETAG
.
equals
(
opMessageExt
.
getTags
()))
{
if
(
queueOffset
<
miniOffset
)
{
if
(
queueOffset
<
miniOffset
)
{
...
@@ -461,7 +461,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
...
@@ -461,7 +461,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
@Override
@Override
public
boolean
deletePrepareMessage
(
MessageExt
msgExt
)
{
public
boolean
deletePrepareMessage
(
MessageExt
msgExt
)
{
if
(
this
.
transactionalMessageBridge
.
putOpMessage
(
msgExt
,
TransactionalMessageUtil
.
REMOVETAG
))
{
if
(
this
.
transactionalMessageBridge
.
putOpMessage
(
msgExt
,
TransactionalMessageUtil
.
REMOVETAG
))
{
log
.
info
(
"Transaction op message write successfully. messageId={}, queueId={} msgExt:{}"
,
msgExt
.
getMsgId
(),
msgExt
.
getQueueId
(),
msgExt
);
log
.
debug
(
"Transaction op message write successfully. messageId={}, queueId={} msgExt:{}"
,
msgExt
.
getMsgId
(),
msgExt
.
getQueueId
(),
msgExt
);
return
true
;
return
true
;
}
else
{
}
else
{
log
.
error
(
"Transaction op message write failed. messageId is {}, queueId is {}"
,
msgExt
.
getMsgId
(),
msgExt
.
getQueueId
());
log
.
error
(
"Transaction op message write failed. messageId is {}, queueId is {}"
,
msgExt
.
getMsgId
(),
msgExt
.
getQueueId
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录