Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
9d34fadc
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
9d34fadc
编写于
8月 19, 2020
作者:
A
affe
提交者:
GitHub
8月 19, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2236 from Demogorgon314/develop
[ISSUE #2233] Enhancement MQBrokerException include broker information…
上级
a093f8a6
e088f2ea
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
60 addition
and
44 deletion
+60
-44
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
...n/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+6
-6
client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
...g/apache/rocketmq/client/exception/MQBrokerException.java
+15
-1
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
...java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+38
-36
client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
...mq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
浏览文件 @
9d34fadc
...
...
@@ -209,7 +209,7 @@ public class BrokerOuterAPI {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
requestHeader
==
null
?
null
:
requestHeader
.
getBrokerAddr
()
);
}
public
void
unregisterBrokerAll
(
...
...
@@ -255,7 +255,7 @@ public class BrokerOuterAPI {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
brokerAddr
);
}
public
List
<
Boolean
>
needRegister
(
...
...
@@ -338,7 +338,7 @@ public class BrokerOuterAPI {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
ConsumerOffsetSerializeWrapper
getAllConsumerOffset
(
...
...
@@ -355,7 +355,7 @@ public class BrokerOuterAPI {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
String
getAllDelayOffset
(
...
...
@@ -372,7 +372,7 @@ public class BrokerOuterAPI {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
SubscriptionGroupWrapper
getAllSubscriptionGroupConfig
(
...
...
@@ -389,7 +389,7 @@ public class BrokerOuterAPI {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
registerRPCHook
(
RPCHook
rpcHook
)
{
...
...
client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
浏览文件 @
9d34fadc
...
...
@@ -23,12 +23,22 @@ public class MQBrokerException extends Exception {
private
static
final
long
serialVersionUID
=
5975020272601250368L
;
private
final
int
responseCode
;
private
final
String
errorMessage
;
private
final
String
brokerAddr
;
public
MQBrokerException
(
int
responseCode
,
String
errorMessage
)
{
super
(
FAQUrl
.
attachDefaultURL
(
"CODE: "
+
UtilAll
.
responseCode2String
(
responseCode
)
+
" DESC: "
+
errorMessage
));
this
.
responseCode
=
responseCode
;
this
.
errorMessage
=
errorMessage
;
this
.
brokerAddr
=
null
;
}
public
MQBrokerException
(
int
responseCode
,
String
errorMessage
,
String
brokerAddr
)
{
super
(
FAQUrl
.
attachDefaultURL
(
"CODE: "
+
UtilAll
.
responseCode2String
(
responseCode
)
+
" DESC: "
+
errorMessage
+
(
brokerAddr
!=
null
?
" BROKER: "
+
brokerAddr
:
""
)));
this
.
responseCode
=
responseCode
;
this
.
errorMessage
=
errorMessage
;
this
.
brokerAddr
=
brokerAddr
;
}
public
int
getResponseCode
()
{
...
...
@@ -38,4 +48,8 @@ public class MQBrokerException extends Exception {
public
String
getErrorMessage
()
{
return
errorMessage
;
}
public
String
getBrokerAddr
()
{
return
brokerAddr
;
}
}
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
浏览文件 @
9d34fadc
...
...
@@ -390,7 +390,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
...
...
@@ -414,7 +414,7 @@ public class MQClientAPIImpl {
default
:
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
...
...
@@ -502,7 +502,7 @@ public class MQClientAPIImpl {
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
{
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
addr
,
request
,
timeoutMillis
);
assert
response
!=
null
;
return
this
.
processSendResponse
(
brokerName
,
msg
,
response
);
return
this
.
processSendResponse
(
brokerName
,
msg
,
response
,
addr
);
}
private
void
sendMessageAsync
(
...
...
@@ -528,7 +528,7 @@ public class MQClientAPIImpl {
if
(
null
==
sendCallback
&&
response
!=
null
)
{
try
{
SendResult
sendResult
=
MQClientAPIImpl
.
this
.
processSendResponse
(
brokerName
,
msg
,
response
);
SendResult
sendResult
=
MQClientAPIImpl
.
this
.
processSendResponse
(
brokerName
,
msg
,
response
,
addr
);
if
(
context
!=
null
&&
sendResult
!=
null
)
{
context
.
setSendResult
(
sendResult
);
context
.
getProducer
().
executeSendMessageHookAfter
(
context
);
...
...
@@ -542,7 +542,7 @@ public class MQClientAPIImpl {
if
(
response
!=
null
)
{
try
{
SendResult
sendResult
=
MQClientAPIImpl
.
this
.
processSendResponse
(
brokerName
,
msg
,
response
);
SendResult
sendResult
=
MQClientAPIImpl
.
this
.
processSendResponse
(
brokerName
,
msg
,
response
,
addr
);
assert
sendResult
!=
null
;
if
(
context
!=
null
)
{
context
.
setSendResult
(
sendResult
);
...
...
@@ -641,7 +641,8 @@ public class MQClientAPIImpl {
private
SendResult
processSendResponse
(
final
String
brokerName
,
final
Message
msg
,
final
RemotingCommand
response
final
RemotingCommand
response
,
final
String
addr
)
throws
MQBrokerException
,
RemotingCommandException
{
SendStatus
sendStatus
;
switch
(
response
.
getCode
())
{
...
...
@@ -662,7 +663,7 @@ public class MQClientAPIImpl {
break
;
}
default
:
{
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
}
...
...
@@ -741,7 +742,7 @@ public class MQClientAPIImpl {
RemotingCommand
response
=
responseFuture
.
getResponseCommand
();
if
(
response
!=
null
)
{
try
{
PullResult
pullResult
=
MQClientAPIImpl
.
this
.
processPullResponse
(
response
);
PullResult
pullResult
=
MQClientAPIImpl
.
this
.
processPullResponse
(
response
,
addr
);
assert
pullResult
!=
null
;
pullCallback
.
onSuccess
(
pullResult
);
}
catch
(
Exception
e
)
{
...
...
@@ -768,11 +769,12 @@ public class MQClientAPIImpl {
)
throws
RemotingException
,
InterruptedException
,
MQBrokerException
{
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
addr
,
request
,
timeoutMillis
);
assert
response
!=
null
;
return
this
.
processPullResponse
(
response
);
return
this
.
processPullResponse
(
response
,
addr
);
}
private
PullResult
processPullResponse
(
final
RemotingCommand
response
)
throws
MQBrokerException
,
RemotingCommandException
{
final
RemotingCommand
response
,
final
String
addr
)
throws
MQBrokerException
,
RemotingCommandException
{
PullStatus
pullStatus
=
PullStatus
.
NO_NEW_MSG
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
SUCCESS
:
...
...
@@ -789,7 +791,7 @@ public class MQClientAPIImpl {
break
;
default
:
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
PullMessageResponseHeader
responseHeader
=
...
...
@@ -822,7 +824,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
long
searchOffset
(
final
String
addr
,
final
String
topic
,
final
int
queueId
,
final
long
timestamp
,
...
...
@@ -847,7 +849,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
long
getMaxOffset
(
final
String
addr
,
final
String
topic
,
final
int
queueId
,
final
long
timeoutMillis
)
...
...
@@ -871,7 +873,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
List
<
String
>
getConsumerIdListByGroup
(
...
...
@@ -898,7 +900,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
long
getMinOffset
(
final
String
addr
,
final
String
topic
,
final
int
queueId
,
final
long
timeoutMillis
)
...
...
@@ -922,7 +924,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
long
getEarliestMsgStoretime
(
final
String
addr
,
final
String
topic
,
final
int
queueId
,
...
...
@@ -947,7 +949,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
long
queryConsumerOffset
(
...
...
@@ -971,7 +973,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
updateConsumerOffset
(
...
...
@@ -992,7 +994,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
updateConsumerOffsetOneway
(
...
...
@@ -1024,7 +1026,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
unregisterClient
(
...
...
@@ -1050,7 +1052,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
endTransactionOneway
(
...
...
@@ -1116,7 +1118,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
Set
<
MessageQueue
>
lockBatchMQ
(
...
...
@@ -1138,7 +1140,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
unlockBatchMQ
(
...
...
@@ -1164,7 +1166,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
}
...
...
@@ -1187,7 +1189,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
ConsumeStats
getConsumeStats
(
final
String
addr
,
final
String
consumerGroup
,
final
long
timeoutMillis
)
...
...
@@ -1217,7 +1219,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
ProducerConnection
getProducerConnectionList
(
final
String
addr
,
final
String
producerGroup
,
...
...
@@ -1239,7 +1241,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
ConsumerConnection
getConsumerConnectionList
(
final
String
addr
,
final
String
consumerGroup
,
...
...
@@ -1261,7 +1263,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
KVTable
getBrokerRuntimeInfo
(
final
String
addr
,
final
long
timeoutMillis
)
throws
RemotingConnectException
,
...
...
@@ -1279,7 +1281,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
updateBrokerConfig
(
final
String
addr
,
final
Properties
properties
,
final
long
timeoutMillis
)
...
...
@@ -1301,7 +1303,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
}
...
...
@@ -1320,7 +1322,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
ClusterInfo
getBrokerClusterInfo
(
...
...
@@ -1670,7 +1672,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
List
<
QueueTimeSpan
>
queryConsumeTimeSpan
(
final
String
addr
,
final
String
topic
,
final
String
group
,
...
...
@@ -1694,7 +1696,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
TopicList
getTopicsByCluster
(
final
String
cluster
,
final
long
timeoutMillis
)
...
...
@@ -1745,7 +1747,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
TopicList
getSystemTopicList
(
...
...
@@ -2108,7 +2110,7 @@ public class MQClientAPIImpl {
default
:
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
brokerAddr
);
}
public
TopicConfigSerializeWrapper
getAllTopicConfig
(
final
String
addr
,
...
...
@@ -2127,7 +2129,7 @@ public class MQClientAPIImpl {
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
()
,
addr
);
}
public
void
updateNameServerConfig
(
final
Properties
properties
,
final
List
<
String
>
nameServers
,
long
timeoutMillis
)
...
...
client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
浏览文件 @
9d34fadc
...
...
@@ -84,7 +84,7 @@ public class RemoteBrokerOffsetStoreTest {
offsetStore
.
updateOffset
(
messageQueue
,
1024
,
false
);
doThrow
(
new
MQBrokerException
(-
1
,
""
))
doThrow
(
new
MQBrokerException
(-
1
,
""
,
null
))
.
when
(
mqClientAPI
).
queryConsumerOffset
(
anyString
(),
any
(
QueryConsumerOffsetRequestHeader
.
class
),
anyLong
());
assertThat
(
offsetStore
.
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_STORE
)).
isEqualTo
(-
1
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录