Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
c0ffc5bd
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 3 年多
通知
295
Star
16140
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
c0ffc5bd
编写于
11月 09, 2021
作者:
D
dongeforever
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Finish the producer to get the real broker addr
上级
40d86269
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
90 addition
and
264 deletion
+90
-264
client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
...ain/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+13
-11
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
...java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+6
-30
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
...ient/impl/consumer/ConsumeMessageConcurrentlyService.java
+1
-1
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
...ketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+1
-0
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
...ketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+19
-2
client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
.../apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+4
-2
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
...apache/rocketmq/client/impl/factory/MQClientInstance.java
+37
-1
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
.../rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+8
-217
common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
...va/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
+1
-0
未找到文件。
client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
浏览文件 @
c0ffc5bd
...
...
@@ -191,10 +191,10 @@ public class MQAdminImpl {
if
(
logicalQueueRouteData
!=
null
)
{
mq
=
logicalQueueRouteData
.
getMessageQueue
();
}
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
if
(
null
==
brokerAddr
)
{
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
mq
.
getTopic
());
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
}
if
(
brokerAddr
!=
null
)
{
...
...
@@ -227,10 +227,10 @@ public class MQAdminImpl {
previousQueueRouteData
=
maxQueueRouteData
;
mq
=
maxQueueRouteData
.
getMessageQueue
();
}
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
if
(
null
==
brokerAddr
)
{
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
topic
);
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
}
if
(
brokerAddr
!=
null
)
{
...
...
@@ -238,7 +238,8 @@ public class MQAdminImpl {
long
offset
=
this
.
mQClientFactory
.
getMQClientAPIImpl
().
getMaxOffset
(
brokerAddr
,
topic
,
mq
.
getQueueId
(),
committed
,
maxQueueRouteData
!=
null
,
timeoutMillis
);
return
correctLogicalQueueOffset
(
offset
,
maxQueueRouteData
);
}
catch
(
MQRedirectException
e
)
{
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
topic
,
false
,
null
,
Collections
.
singleton
(
mq
.
getQueueId
()));
//TODO
//this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, false, null, Collections.singleton(mq.getQueueId()));
continue
;
}
catch
(
Exception
e
)
{
throw
new
MQClientException
(
"Invoke Broker["
+
brokerAddr
+
"] exception"
,
e
);
...
...
@@ -255,10 +256,10 @@ public class MQAdminImpl {
mq
=
minQueueRouteData
.
getMessageQueue
();
}
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
if
(
null
==
brokerAddr
)
{
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
mq
.
getTopic
());
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
}
if
(
brokerAddr
!=
null
)
{
...
...
@@ -281,10 +282,11 @@ public class MQAdminImpl {
topicRouteData
=
this
.
mQClientFactory
.
queryTopicRouteData
(
mq
.
getTopic
());
}
if
(
topicRouteData
!=
null
)
{
LogicalQueuesInfo
logicalQueuesInfo
=
topicRouteData
.
getLogicalQueuesInfo
();
//TODO
/*LogicalQueuesInfo logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
if (logicalQueuesInfo != null) {
return logicalQueuesInfo.get(mq.getQueueId());
}
}
*/
}
}
return
null
;
...
...
@@ -296,10 +298,10 @@ public class MQAdminImpl {
mq
=
minQueueRouteData
.
getMessageQueue
();
}
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
if
(
null
==
brokerAddr
)
{
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
mq
.
getTopic
());
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
}
if
(
brokerAddr
!=
null
)
{
...
...
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
浏览文件 @
c0ffc5bd
...
...
@@ -624,13 +624,9 @@ public class MQClientAPIImpl {
producer
.
updateFaultItem
(
brokerName
,
System
.
currentTimeMillis
()
-
responseFuture
.
getBeginTimestamp
(),
false
);
}
catch
(
Exception
e
)
{
if
(
e
instanceof
MQRedirectException
)
{
sendCallback
.
onException
(
e
);
}
else
{
producer
.
updateFaultItem
(
brokerName
,
System
.
currentTimeMillis
()
-
responseFuture
.
getBeginTimestamp
(),
true
);
onExceptionImpl
(
brokerName
,
msg
,
timeoutMillis
-
cost
,
request
,
sendCallback
,
topicPublishInfo
,
instance
,
retryTimesWhenSendFailed
,
times
,
e
,
context
,
false
,
producer
);
}
producer
.
updateFaultItem
(
brokerName
,
System
.
currentTimeMillis
()
-
responseFuture
.
getBeginTimestamp
(),
true
);
onExceptionImpl
(
brokerName
,
msg
,
timeoutMillis
-
cost
,
request
,
sendCallback
,
topicPublishInfo
,
instance
,
retryTimesWhenSendFailed
,
times
,
e
,
context
,
false
,
producer
);
}
}
else
{
producer
.
updateFaultItem
(
brokerName
,
System
.
currentTimeMillis
()
-
responseFuture
.
getBeginTimestamp
(),
true
);
...
...
@@ -672,7 +668,7 @@ public class MQClientAPIImpl {
String
retryBrokerName
=
brokerName
;
//by default, it will send to the same broker
if
(
topicPublishInfo
!=
null
)
{
//select one message queue accordingly, in order to determine which broker to send
MessageQueue
mqChosen
=
producer
.
selectOneMessageQueue
(
topicPublishInfo
,
brokerName
);
retryBrokerName
=
mqChosen
.
getBrokerName
(
);
retryBrokerName
=
instance
.
getBrokerNameFromMessageQueue
(
mqChosen
);
}
String
addr
=
instance
.
findBrokerAddressInPublish
(
retryBrokerName
);
log
.
warn
(
String
.
format
(
"async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}"
,
tmp
,
msg
.
getTopic
(),
addr
,
...
...
@@ -709,11 +705,6 @@ public class MQClientAPIImpl {
final
RemotingCommand
response
,
final
String
addr
)
throws
MQBrokerException
,
RemotingCommandException
{
HashMap
<
String
,
String
>
extFields
=
response
.
getExtFields
();
if
(
extFields
!=
null
&&
extFields
.
containsKey
(
MessageConst
.
PROPERTY_REDIRECT
))
{
throw
new
MQRedirectException
(
response
.
getBody
());
}
SendStatus
sendStatus
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
FLUSH_DISK_TIMEOUT
:
{
...
...
@@ -962,11 +953,6 @@ public class MQClientAPIImpl {
private
PullResult
processPullResponse
(
final
RemotingCommand
response
,
final
String
addr
)
throws
MQBrokerException
,
RemotingCommandException
{
HashMap
<
String
,
String
>
extFields
=
response
.
getExtFields
();
if
(
extFields
!=
null
&&
extFields
.
containsKey
(
MessageConst
.
PROPERTY_REDIRECT
))
{
throw
new
MQRedirectException
(
response
.
getBody
());
}
PullStatus
pullStatus
=
PullStatus
.
NO_NEW_MSG
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
SUCCESS
:
...
...
@@ -981,6 +967,7 @@ public class MQClientAPIImpl {
case
ResponseCode
.
PULL_OFFSET_MOVED
:
pullStatus
=
PullStatus
.
OFFSET_ILLEGAL
;
break
;
default
:
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
(),
addr
);
}
...
...
@@ -1649,15 +1636,8 @@ public class MQClientAPIImpl {
public
TopicRouteData
getTopicRouteInfoFromNameServer
(
final
String
topic
,
final
long
timeoutMillis
,
boolean
allowTopicNotExist
)
throws
MQClientException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
return
getTopicRouteInfoFromNameServer
(
topic
,
timeoutMillis
,
allowTopicNotExist
,
null
);
}
public
TopicRouteData
getTopicRouteInfoFromNameServer
(
final
String
topic
,
final
long
timeoutMillis
,
boolean
allowTopicNotExist
,
Set
<
Integer
>
logicalQueueIdsFilter
)
throws
MQClientException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
GetRouteInfoRequestHeader
requestHeader
=
new
GetRouteInfoRequestHeader
();
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setSysFlag
(
MessageSysFlag
.
LOGICAL_QUEUE_FLAG
);
requestHeader
.
setLogicalQueueIdsFilter
(
logicalQueueIdsFilter
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
GET_ROUTEINFO_BY_TOPIC
,
requestHeader
);
...
...
@@ -1674,11 +1654,7 @@ public class MQClientAPIImpl {
case
ResponseCode
.
SUCCESS
:
{
byte
[]
body
=
response
.
getBody
();
if
(
body
!=
null
)
{
return
fromNullable
(
RemotingSerializable
.
decode
(
body
,
TopicRouteDataNameSrv
.
class
)).
transform
(
new
Function
<
TopicRouteDataNameSrv
,
TopicRouteData
>()
{
@Override
public
TopicRouteData
apply
(
TopicRouteDataNameSrv
srv
)
{
return
srv
.
toTopicRouteData
();
}
}).
orNull
();
return
TopicRouteData
.
decode
(
body
,
TopicRouteData
.
class
);
}
}
default
:
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
浏览文件 @
c0ffc5bd
...
...
@@ -306,7 +306,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
// Wrap topic with namespace before sending back message.
msg
.
setTopic
(
this
.
defaultMQPushConsumer
.
withNamespace
(
msg
.
getTopic
()));
try
{
this
.
defaultMQPushConsumerImpl
.
sendMessageBack
(
msg
,
delayLevel
,
context
.
getMessageQueue
().
getBrokerName
(
));
this
.
defaultMQPushConsumerImpl
.
sendMessageBack
(
msg
,
delayLevel
,
this
.
defaultMQPushConsumer
.
queueWithNamespace
(
context
.
getMessageQueue
()
));
return
true
;
}
catch
(
Exception
e
)
{
log
.
error
(
"sendMessageBack exception, group: "
+
this
.
consumerGroup
+
" msg: "
+
msg
.
toString
(),
e
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
浏览文件 @
c0ffc5bd
...
...
@@ -574,6 +574,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this
.
offsetStore
.
updateConsumeOffsetToBroker
(
mq
,
offset
,
isOneway
);
}
@Deprecated
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
final
String
brokerName
,
String
consumerGroup
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
try
{
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
浏览文件 @
c0ffc5bd
...
...
@@ -704,11 +704,28 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
log
.
info
(
"resume this consumer, {}"
,
this
.
defaultMQPushConsumer
.
getConsumerGroup
());
}
@Deprecated
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
final
String
brokerName
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
sendMessageBack
(
msg
,
delayLevel
,
brokerName
,
null
);
}
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
final
MessageQueue
mq
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
sendMessageBack
(
msg
,
delayLevel
,
null
,
mq
);
}
private
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
final
String
brokerName
,
final
MessageQueue
mq
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
try
{
String
brokerAddr
=
(
null
!=
brokerName
)
?
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
brokerName
)
:
RemotingHelper
.
parseSocketAddressAddr
(
msg
.
getStoreHost
());
String
brokerAddr
=
null
;
if
(
null
!=
mq
)
{
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
}
else
if
(
null
!=
brokerName
)
{
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
brokerName
);
}
else
{
RemotingHelper
.
parseSocketAddressAddr
(
msg
.
getStoreHost
());
}
this
.
mQClientFactory
.
getMQClientAPIImpl
().
consumerSendMessageBack
(
brokerAddr
,
msg
,
this
.
defaultMQPushConsumer
.
getConsumerGroup
(),
delayLevel
,
5000
,
getMaxReconsumeTimes
());
}
catch
(
Exception
e
)
{
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
浏览文件 @
c0ffc5bd
...
...
@@ -452,7 +452,8 @@ public class PullAPIWrapper {
private
void
buildLogicalQueuesInfo
()
{
TopicRouteData
topicRouteData
=
PullAPIWrapper
.
this
.
mQClientFactory
.
queryTopicRouteData
(
mq
.
getTopic
());
if
(
topicRouteData
!=
null
)
{
this
.
logicalQueuesInfo
=
topicRouteData
.
getLogicalQueuesInfo
();
//TODO
//this.logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
}
}
...
...
@@ -623,7 +624,8 @@ public class PullAPIWrapper {
log
.
warn
(
"LogicalQueueContext.processResponseBody {} update exception, fallback to updateTopicRouteInfoFromNameServer"
,
this
.
logicalQueueRouteData
,
e
);
}
}
PullAPIWrapper
.
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
mq
.
getTopic
(),
false
,
null
,
Collections
.
singleton
(
this
.
mq
.
getQueueId
()));
//TODO
//PullAPIWrapper.this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic(), false, null, Collections.singleton(this.mq.getQueueId()));
this
.
buildLogicalQueuesInfo
();
}
}
...
...
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
浏览文件 @
c0ffc5bd
...
...
@@ -1046,6 +1046,24 @@ public class MQClientInstance {
return
this
.
consumerTable
.
get
(
group
);
}
public
String
getBrokerNameFromMessageQueue
(
final
MessageQueue
mq
)
{
if
(
topicEndPointsTable
.
get
(
mq
.
getTopic
())
!=
null
&&
!
topicEndPointsTable
.
get
(
mq
.
getTopic
()).
isEmpty
())
{
return
topicEndPointsTable
.
get
(
mq
.
getTopic
()).
get
(
mq
);
}
return
mq
.
getBrokerName
();
}
public
FindBrokerResult
findBrokerAddressInAdmin
(
final
MessageQueue
mq
)
{
String
brokerName
=
getBrokerNameFromMessageQueue
(
mq
);
if
(
brokerName
==
null
)
{
return
null
;
}
else
{
return
findBrokerAddressInAdmin
(
brokerName
);
}
}
public
FindBrokerResult
findBrokerAddressInAdmin
(
final
String
brokerName
)
{
String
brokerAddr
=
null
;
boolean
slave
=
false
;
...
...
@@ -1076,6 +1094,15 @@ public class MQClientInstance {
return
null
;
}
public
String
findBrokerAddressInPublish
(
final
MessageQueue
mq
)
{
String
brokerName
=
getBrokerNameFromMessageQueue
(
mq
);
if
(
brokerName
==
null
)
{
return
null
;
}
else
{
return
findBrokerAddressInPublish
(
brokerName
);
}
}
//This is used for retry only
public
String
findBrokerAddressInPublish
(
final
String
brokerName
)
{
HashMap
<
Long
/* brokerId */
,
String
/* address */
>
map
=
this
.
brokerAddrTable
.
get
(
brokerName
);
if
(
map
!=
null
&&
!
map
.
isEmpty
())
{
...
...
@@ -1085,6 +1112,15 @@ public class MQClientInstance {
return
null
;
}
public
FindBrokerResult
findBrokerAddressInSubscribe
(
final
MessageQueue
mq
,
final
long
brokerId
,
final
boolean
onlyThisBroker
)
{
String
brokerName
=
getBrokerNameFromMessageQueue
(
mq
);
if
(
brokerName
==
null
)
{
return
null
;
}
else
{
return
findBrokerAddressInSubscribe
(
brokerName
,
brokerId
,
onlyThisBroker
);
}
}
public
FindBrokerResult
findBrokerAddressInSubscribe
(
final
String
brokerName
,
final
long
brokerId
,
...
...
@@ -1120,7 +1156,7 @@ public class MQClientInstance {
return
null
;
}
p
ublic
int
findBrokerVersion
(
String
brokerName
,
String
brokerAddr
)
{
p
rivate
int
findBrokerVersion
(
String
brokerName
,
String
brokerAddr
)
{
if
(
this
.
brokerVersionTable
.
containsKey
(
brokerName
))
{
if
(
this
.
brokerVersionTable
.
get
(
brokerName
).
containsKey
(
brokerAddr
))
{
return
this
.
brokerVersionTable
.
get
(
brokerName
).
get
(
brokerAddr
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
浏览文件 @
c0ffc5bd
...
...
@@ -730,45 +730,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final
SendCallback
sendCallback
,
final
TopicPublishInfo
topicPublishInfo
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
if
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
.
equals
(
mq
.
getBrokerName
()))
{
LogicalQueueSendContext
logicalQueueContext
=
new
LogicalQueueSendContext
(
msg
,
mq
,
communicationMode
,
sendCallback
,
topicPublishInfo
,
timeout
);
while
(
true
)
{
try
{
SendResult
sendResult
=
this
.
sendKernelImplWithoutRetry
(
msg
,
logicalQueueContext
.
getModifiedMessageQueue
(),
communicationMode
,
logicalQueueContext
.
wrapSendCallback
(),
topicPublishInfo
,
timeout
);
return
logicalQueueContext
.
wrapSendResult
(
sendResult
);
}
catch
(
MQRedirectException
e
)
{
if
(!
logicalQueueContext
.
shouldRetry
(
e
))
{
throw
new
MQBrokerException
(
ResponseCode
.
SYSTEM_ERROR
,
"redirect"
);
}
}
catch
(
RemotingException
e
)
{
if
(!
logicalQueueContext
.
shouldRetry
(
e
))
{
throw
e
;
}
}
}
}
else
{
return
sendKernelImplWithoutRetry
(
msg
,
mq
,
communicationMode
,
sendCallback
,
topicPublishInfo
,
timeout
);
}
}
private
SendResult
sendKernelImplWithoutRetry
(
final
Message
msg
,
final
MessageQueue
mq
,
final
CommunicationMode
communicationMode
,
SendCallback
sendCallback
,
final
TopicPublishInfo
topicPublishInfo
,
final
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
long
beginStartTime
=
System
.
currentTimeMillis
();
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
if
(
null
==
brokerAddr
)
{
tryToFindTopicPublishInfo
(
mq
.
getTopic
());
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
.
getBrokerName
()
);
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
mq
);
}
String
brokerName
=
this
.
mQClientFactory
.
getBrokerNameFromMessageQueue
(
mq
);
SendMessageContext
context
=
null
;
if
(
brokerAddr
!=
null
)
{
brokerAddr
=
MixAll
.
brokerVIPChannel
(
this
.
defaultMQProducer
.
isSendMessageWithVIPChannel
(),
brokerAddr
);
...
...
@@ -798,10 +768,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
sysFlag
|=
MessageSysFlag
.
TRANSACTION_PREPARED_TYPE
;
}
if
(!
CommunicationMode
.
ONEWAY
.
equals
(
communicationMode
))
{
sysFlag
|=
MessageSysFlag
.
LOGICAL_QUEUE_FLAG
;
}
if
(
hasCheckForbiddenHook
())
{
CheckForbiddenContext
checkForbiddenContext
=
new
CheckForbiddenContext
();
checkForbiddenContext
.
setNameSrvAddr
(
this
.
defaultMQProducer
.
getNamesrvAddr
());
...
...
@@ -890,7 +856,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
sendResult
=
this
.
mQClientFactory
.
getMQClientAPIImpl
().
sendMessage
(
brokerAddr
,
mq
.
getBrokerName
()
,
brokerName
,
tmpMessage
,
requestHeader
,
timeout
-
costTimeAsync
,
...
...
@@ -910,7 +876,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
sendResult
=
this
.
mQClientFactory
.
getMQClientAPIImpl
().
sendMessage
(
brokerAddr
,
mq
.
getBrokerName
()
,
brokerName
,
msg
,
requestHeader
,
timeout
-
costTimeSync
,
...
...
@@ -941,7 +907,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
throw
new
MQClientException
(
"The broker["
+
mq
.
getBrokerName
()
+
"] not exist"
,
null
);
throw
new
MQClientException
(
"The broker["
+
brokerName
+
"] not exist"
,
null
);
}
public
MQClientInstance
getmQClientFactory
()
{
...
...
@@ -1042,7 +1008,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
executeEndTransactionHook
(
context
);
}
}
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
...
...
@@ -1377,7 +1342,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
id
=
MessageDecoder
.
decodeMessageId
(
sendResult
.
getMsgId
());
}
String
transactionId
=
sendResult
.
getTransactionId
();
final
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
sendResult
.
getMessageQueue
()
.
getBrokerName
()
);
final
String
brokerAddr
=
this
.
mQClientFactory
.
findBrokerAddressInPublish
(
sendResult
.
getMessageQueue
());
EndTransactionRequestHeader
requestHeader
=
new
EndTransactionRequestHeader
();
requestHeader
.
setTransactionId
(
transactionId
);
requestHeader
.
setCommitLogOffset
(
id
.
getOffset
());
...
...
@@ -1682,178 +1647,4 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public
DefaultMQProducer
getDefaultMQProducer
()
{
return
defaultMQProducer
;
}
private
class
LogicalQueueSendContext
implements
SendCallback
{
private
final
Message
msg
;
private
final
MessageQueue
mq
;
private
final
CommunicationMode
communicationMode
;
private
final
SendCallback
sendCallback
;
private
final
TopicPublishInfo
topicPublishInfo
;
private
final
long
timeout
;
private
volatile
LogicalQueuesInfo
logicalQueuesInfo
;
private
volatile
LogicalQueueRouteData
writableQueueRouteData
;
private
final
AtomicInteger
retry
=
new
AtomicInteger
();
public
LogicalQueueSendContext
(
Message
msg
,
MessageQueue
mq
,
CommunicationMode
communicationMode
,
SendCallback
sendCallback
,
TopicPublishInfo
topicPublishInfo
,
long
timeout
)
{
this
.
msg
=
msg
;
this
.
mq
=
mq
;
this
.
communicationMode
=
communicationMode
;
this
.
sendCallback
=
sendCallback
;
this
.
topicPublishInfo
=
topicPublishInfo
;
this
.
timeout
=
timeout
;
if
(
topicPublishInfo
==
null
)
{
topicPublishInfo
=
DefaultMQProducerImpl
.
this
.
tryToFindTopicPublishInfo
(
mq
.
getTopic
());
}
if
(
topicPublishInfo
!=
null
)
{
this
.
logicalQueuesInfo
=
topicPublishInfo
.
getTopicRouteData
().
getLogicalQueuesInfo
();
}
else
{
this
.
logicalQueuesInfo
=
null
;
}
}
private
boolean
notUsingLogicalQueue
()
{
return
!
Objects
.
equal
(
mq
.
getBrokerName
(),
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
)
||
this
.
logicalQueuesInfo
==
null
;
}
public
MessageQueue
getModifiedMessageQueue
()
throws
MQClientException
{
if
(
this
.
notUsingLogicalQueue
())
{
return
this
.
mq
;
}
this
.
writableQueueRouteData
=
getWritableQueueRouteData
();
MessageQueue
mq
=
new
MessageQueue
(
this
.
mq
);
mq
.
setBrokerName
(
writableQueueRouteData
.
getBrokerName
());
mq
.
setQueueId
(
writableQueueRouteData
.
getQueueId
());
return
mq
;
}
private
LogicalQueueRouteData
getWritableQueueRouteData
()
throws
MQClientException
{
this
.
logicalQueuesInfo
.
readLock
().
lock
();
try
{
List
<
LogicalQueueRouteData
>
queueRouteDataList
=
logicalQueuesInfo
.
get
(
mq
.
getQueueId
());
if
(
queueRouteDataList
==
null
||
queueRouteDataList
.
size
()
==
0
)
{
throw
new
MQClientException
(
String
.
format
(
Locale
.
ENGLISH
,
"send to a logical queue %d but no queue route data found"
,
mq
.
getQueueId
()),
null
);
}
// usually writable queue is placed in the last position, or second last when queue migrating
for
(
int
i
=
queueRouteDataList
.
size
()
-
1
;
i
>=
0
;
i
--)
{
LogicalQueueRouteData
queueRouteData
=
queueRouteDataList
.
get
(
i
);
if
(
queueRouteData
.
isWritable
())
{
return
queueRouteData
;
}
}
throw
new
MQClientException
(
String
.
format
(
Locale
.
ENGLISH
,
"send to a logical queue %d but no writable queue route data found"
,
mq
.
getQueueId
()),
null
);
}
finally
{
this
.
logicalQueuesInfo
.
readLock
().
unlock
();
}
}
@Override
public
void
onSuccess
(
SendResult
sendResult
)
{
this
.
sendCallback
.
onSuccess
(
this
.
wrapSendResult
(
sendResult
));
}
@Override
public
void
onException
(
Throwable
t
)
{
if
(
this
.
shouldRetry
(
t
))
{
try
{
DefaultMQProducerImpl
.
this
.
sendKernelImplWithoutRetry
(
msg
,
this
.
getModifiedMessageQueue
(),
communicationMode
,
this
,
topicPublishInfo
,
timeout
);
return
;
}
catch
(
Exception
e
)
{
t
=
e
;
}
}
if
(
t
instanceof
MQRedirectException
)
{
t
=
new
MQBrokerException
(
ResponseCode
.
SYSTEM_ERROR
,
"redirect"
);
}
this
.
sendCallback
.
onException
(
t
);
}
private
void
handleRedirectException
(
MQRedirectException
re
)
{
byte
[]
responseBody
=
re
.
getBody
();
log
.
info
(
"LogicalQueueContext.processResponseBody got redirect {}: {}"
,
this
.
writableQueueRouteData
,
responseBody
!=
null
?
new
String
(
responseBody
,
MessageDecoder
.
CHARSET_UTF8
)
:
null
);
try
{
List
<
LogicalQueueRouteData
>
newQueueRouteDataList
=
JSON
.
parseObject
(
responseBody
,
MixAll
.
TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA
);
this
.
logicalQueuesInfo
.
updateLogicalQueueRouteDataList
(
this
.
mq
.
getQueueId
(),
newQueueRouteDataList
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"LogicalQueueContext.processResponseBody {} update exception, fallback to updateTopicRouteInfoFromNameServer"
,
this
.
writableQueueRouteData
,
e
);
DefaultMQProducerImpl
.
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
this
.
mq
.
getTopic
(),
false
,
null
,
Collections
.
singleton
(
mq
.
getQueueId
()));
TopicRouteData
topicRouteData
=
DefaultMQProducerImpl
.
this
.
mQClientFactory
.
getAnExistTopicRouteData
(
mq
.
getTopic
());
if
(
topicRouteData
!=
null
)
{
this
.
logicalQueuesInfo
=
topicRouteData
.
getLogicalQueuesInfo
();
}
else
{
this
.
logicalQueuesInfo
=
null
;
}
}
}
public
SendCallback
wrapSendCallback
()
{
if
(
this
.
notUsingLogicalQueue
())
{
return
this
.
sendCallback
;
}
if
(!
CommunicationMode
.
ASYNC
.
equals
(
this
.
communicationMode
))
{
return
this
.
sendCallback
;
}
return
this
;
}
public
boolean
shouldRetry
(
Throwable
t
)
{
this
.
incrRetry
();
if
(
this
.
exceedMaxRetry
())
{
log
.
warn
(
"retry {} too many times: {}"
,
this
.
retry
.
get
(),
this
.
writableQueueRouteData
);
return
false
;
}
if
(!
this
.
writableQueueRouteData
.
isWritable
())
{
log
.
warn
(
"no writable queue: {}"
,
this
.
writableQueueRouteData
);
return
false
;
}
if
(
t
instanceof
MQRedirectException
)
{
this
.
handleRedirectException
((
MQRedirectException
)
t
);
return
true
;
}
return
!(
t
instanceof
RemotingException
)
||
this
.
handleRemotingException
((
RemotingException
)
t
);
}
public
boolean
exceedMaxRetry
()
{
return
this
.
retry
.
get
()
>=
3
;
}
public
void
incrRetry
()
{
this
.
retry
.
incrementAndGet
();
}
public
SendResult
wrapSendResult
(
SendResult
sendResult
)
{
if
(
sendResult
==
null
)
{
return
null
;
}
SendResultForLogicalQueue
newSendResult
=
new
SendResultForLogicalQueue
(
sendResult
,
this
.
writableQueueRouteData
.
getLogicalQueueIndex
());
long
queueOffset
=
newSendResult
.
getQueueOffset
();
if
(
queueOffset
>=
0
)
{
newSendResult
.
setQueueOffset
(
LogicalQueueSendContext
.
this
.
writableQueueRouteData
.
toLogicalQueueOffset
(
queueOffset
));
}
return
newSendResult
;
}
public
boolean
handleRemotingException
(
RemotingException
e
)
{
if
(
e
instanceof
RemotingTooMuchRequestException
)
{
return
false
;
}
DefaultMQProducerImpl
.
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
this
.
mq
.
getTopic
(),
false
,
null
,
Collections
.
singleton
(
mq
.
getQueueId
()));
this
.
logicalQueuesInfo
=
DefaultMQProducerImpl
.
this
.
getTopicPublishInfoTable
().
get
(
mq
.
getTopic
()).
getTopicRouteData
().
getLogicalQueuesInfo
();
LogicalQueueRouteData
writableQueueRouteData
;
try
{
writableQueueRouteData
=
this
.
getWritableQueueRouteData
();
}
catch
(
MQClientException
ce
)
{
log
.
warn
(
"getWritableQueueRouteData exception: {}"
,
this
.
logicalQueuesInfo
.
get
(
mq
.
getQueueId
()),
ce
);
return
false
;
}
if
(
Objects
.
equal
(
this
.
writableQueueRouteData
.
getMessageQueue
(),
writableQueueRouteData
.
getMessageQueue
())
&&
writableQueueRouteData
.
isWritable
())
{
// still same MessageQueue and still writable, no need to retry
return
false
;
}
return
true
;
}
}
}
common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
浏览文件 @
c0ffc5bd
...
...
@@ -25,6 +25,7 @@ public class MessageSysFlag {
public
final
static
int
TRANSACTION_ROLLBACK_TYPE
=
0x3
<<
2
;
public
final
static
int
BORNHOST_V6_FLAG
=
0x1
<<
4
;
public
final
static
int
STOREHOSTADDRESS_V6_FLAG
=
0x1
<<
5
;
//TODO remove the LOGICAL_QUEUE_FLAG
public
final
static
int
LOGICAL_QUEUE_FLAG
=
0x1
<<
6
;
public
static
int
getTransactionValue
(
final
int
flag
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录