Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
Rocketmq
提交
a9837d9c
R
Rocketmq
项目概览
慢慢CG
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
a9837d9c
编写于
9月 18, 2021
作者:
youlixishia
提交者:
GitHub
9月 18, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2965 from zhangjidi2016/add_getConsumerConfig_command
[ISSUE #2964] Add a query consumer config command in mqadmin.
上级
67ccb7f7
46bd4be6
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
238 addition
and
4 deletion
+238
-4
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
...va/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+2
-1
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+4
-2
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
...main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+1
-1
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
...ava/org/apache/rocketmq/tools/command/MQAdminStartup.java
+2
-0
tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
...q/tools/command/consumer/GetConsumerConfigSubCommand.java
+146
-0
tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
...ols/command/consumer/GetConsumerConfigSubCommandTest.java
+83
-0
未找到文件。
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
浏览文件 @
a9837d9c
...
...
@@ -203,7 +203,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public
SubscriptionGroupConfig
examineSubscriptionGroupConfig
(
String
addr
,
String
group
)
{
public
SubscriptionGroupConfig
examineSubscriptionGroupConfig
(
String
addr
,
String
group
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
{
return
defaultMQAdminExtImpl
.
examineSubscriptionGroupConfig
(
addr
,
group
);
}
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
浏览文件 @
a9837d9c
...
...
@@ -217,8 +217,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public
SubscriptionGroupConfig
examineSubscriptionGroupConfig
(
String
addr
,
String
group
)
{
return
null
;
public
SubscriptionGroupConfig
examineSubscriptionGroupConfig
(
String
addr
,
String
group
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
{
SubscriptionGroupWrapper
wrapper
=
this
.
mqClientInstance
.
getMQClientAPIImpl
().
getAllSubscriptionGroup
(
addr
,
timeoutMillis
);
return
wrapper
.
getSubscriptionGroupTable
().
get
(
group
);
}
@Override
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
浏览文件 @
a9837d9c
...
...
@@ -90,7 +90,7 @@ public interface MQAdminExt extends MQAdmin {
final
SubscriptionGroupConfig
config
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
SubscriptionGroupConfig
examineSubscriptionGroupConfig
(
final
String
addr
,
final
String
group
);
SubscriptionGroupConfig
examineSubscriptionGroupConfig
(
final
String
addr
,
final
String
group
)
throws
InterruptedException
,
RemotingException
,
MQClientException
,
MQBrokerException
;
TopicConfig
examineTopicConfig
(
final
String
addr
,
final
String
topic
);
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
浏览文件 @
a9837d9c
...
...
@@ -49,6 +49,7 @@ import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand
import
org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
;
import
org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand
;
import
org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand
;
...
...
@@ -204,6 +205,7 @@ public class MQAdminStartup {
initCommand
(
new
GetNamesrvConfigCommand
());
initCommand
(
new
UpdateNamesrvConfigCommand
());
initCommand
(
new
GetBrokerConfigCommand
());
initCommand
(
new
GetConsumerConfigSubCommand
());
initCommand
(
new
QueryConsumeQueueCommand
());
initCommand
(
new
SendMessageCommand
());
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
0 → 100644
浏览文件 @
a9837d9c
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.consumer
;
import
java.lang.reflect.Field
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.command.SubCommand
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
public
class
GetConsumerConfigSubCommand
implements
SubCommand
{
@Override
public
String
commandName
()
{
return
"getConsumerConfig"
;
}
@Override
public
String
commandDesc
()
{
return
"Get consumer config by subscription group name!"
;
}
@Override
public
Options
buildCommandlineOptions
(
final
Options
options
)
{
Option
opt
=
new
Option
(
"g"
,
"groupName"
,
true
,
"subscription group name"
);
opt
.
setRequired
(
true
);
options
.
addOption
(
opt
);
return
options
;
}
@Override
public
void
execute
(
CommandLine
commandLine
,
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
DefaultMQAdminExt
adminExt
=
new
DefaultMQAdminExt
(
rpcHook
);
adminExt
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
String
groupName
=
commandLine
.
getOptionValue
(
'g'
).
trim
();
try
{
adminExt
.
start
();
List
<
ConsumerConfigInfo
>
consumerConfigInfoList
=
new
ArrayList
<>();
ClusterInfo
clusterInfo
=
adminExt
.
examineBrokerClusterInfo
();
Map
<
String
,
Set
<
String
>>
clusterAddrTable
=
clusterInfo
.
getClusterAddrTable
();
for
(
String
brokerName
:
clusterInfo
.
getBrokerAddrTable
().
keySet
())
{
String
clusterName
=
this
.
getClusterName
(
brokerName
,
clusterAddrTable
);
String
brokerAddress
=
clusterInfo
.
getBrokerAddrTable
().
get
(
brokerName
).
selectBrokerAddr
();
SubscriptionGroupConfig
subscriptionGroupConfig
=
adminExt
.
examineSubscriptionGroupConfig
(
brokerAddress
,
groupName
);
if
(
subscriptionGroupConfig
==
null
)
{
continue
;
}
consumerConfigInfoList
.
add
(
new
ConsumerConfigInfo
(
clusterName
,
brokerName
,
subscriptionGroupConfig
));
}
if
(
CollectionUtils
.
isEmpty
(
consumerConfigInfoList
))
{
return
;
}
for
(
ConsumerConfigInfo
info
:
consumerConfigInfoList
)
{
System
.
out
.
printf
(
"=============================%s:%s=============================\n"
,
info
.
getClusterName
(),
info
.
getBrokerName
());
SubscriptionGroupConfig
config
=
info
.
getSubscriptionGroupConfig
();
Field
[]
fields
=
config
.
getClass
().
getDeclaredFields
();
for
(
Field
field
:
fields
)
{
field
.
setAccessible
(
true
);
if
(
field
.
get
(
config
)
!=
null
)
{
System
.
out
.
printf
(
"%s%-40s= %s\n"
,
""
,
field
.
getName
(),
field
.
get
(
config
).
toString
());
}
else
{
System
.
out
.
printf
(
"%s%-40s= %s\n"
,
""
,
field
.
getName
(),
""
);
}
}
}
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
" command failed"
,
e
);
}
finally
{
adminExt
.
shutdown
();
}
}
private
String
getClusterName
(
String
brokeName
,
Map
<
String
,
Set
<
String
>>
clusterAddrTable
)
{
for
(
Map
.
Entry
<
String
,
Set
<
String
>>
entry
:
clusterAddrTable
.
entrySet
())
{
Set
<
String
>
brokerNameSet
=
entry
.
getValue
();
if
(
brokerNameSet
.
contains
(
brokeName
))
{
return
entry
.
getKey
();
}
}
return
null
;
}
}
class
ConsumerConfigInfo
{
private
String
clusterName
;
private
String
brokerName
;
private
SubscriptionGroupConfig
subscriptionGroupConfig
;
public
ConsumerConfigInfo
(
String
clusterName
,
String
brokerName
,
SubscriptionGroupConfig
subscriptionGroupConfig
)
{
this
.
clusterName
=
clusterName
;
this
.
brokerName
=
brokerName
;
this
.
subscriptionGroupConfig
=
subscriptionGroupConfig
;
}
public
String
getClusterName
()
{
return
clusterName
;
}
public
void
setClusterName
(
String
clusterName
)
{
this
.
clusterName
=
clusterName
;
}
public
String
getBrokerName
()
{
return
brokerName
;
}
public
void
setBrokerName
(
String
brokerNameList
)
{
this
.
brokerName
=
brokerName
;
}
public
SubscriptionGroupConfig
getSubscriptionGroupConfig
()
{
return
subscriptionGroupConfig
;
}
public
void
setSubscriptionGroupConfig
(
SubscriptionGroupConfig
subscriptionGroupConfig
)
{
this
.
subscriptionGroupConfig
=
subscriptionGroupConfig
;
}
}
tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
0 → 100644
浏览文件 @
a9837d9c
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.consumer
;
import
java.io.UnsupportedEncodingException
;
import
java.lang.reflect.Field
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.PosixParser
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.MQClientAPIImpl
;
import
org.apache.rocketmq.client.impl.MQClientManager
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
import
org.junit.AfterClass
;
import
org.junit.BeforeClass
;
import
org.junit.Ignore
;
import
org.junit.Test
;
import
static
org
.
mockito
.
Mockito
.
mock
;
public
class
GetConsumerConfigSubCommandTest
{
private
static
DefaultMQAdminExt
defaultMQAdminExt
;
private
static
DefaultMQAdminExtImpl
defaultMQAdminExtImpl
;
private
static
MQClientInstance
mqClientInstance
=
MQClientManager
.
getInstance
().
getOrCreateMQClientInstance
(
new
ClientConfig
());
private
static
MQClientAPIImpl
mQClientAPIImpl
;
@BeforeClass
public
static
void
init
()
throws
NoSuchFieldException
,
IllegalAccessException
,
InterruptedException
,
RemotingTimeoutException
,
MQClientException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQBrokerException
,
UnsupportedEncodingException
{
mQClientAPIImpl
=
mock
(
MQClientAPIImpl
.
class
);
defaultMQAdminExt
=
new
DefaultMQAdminExt
();
defaultMQAdminExtImpl
=
new
DefaultMQAdminExtImpl
(
defaultMQAdminExt
,
1000
);
Field
field
=
DefaultMQAdminExtImpl
.
class
.
getDeclaredField
(
"mqClientInstance"
);
field
.
setAccessible
(
true
);
field
.
set
(
defaultMQAdminExtImpl
,
mqClientInstance
);
field
=
MQClientInstance
.
class
.
getDeclaredField
(
"mQClientAPIImpl"
);
field
.
setAccessible
(
true
);
field
.
set
(
mqClientInstance
,
mQClientAPIImpl
);
field
=
DefaultMQAdminExt
.
class
.
getDeclaredField
(
"defaultMQAdminExtImpl"
);
field
.
setAccessible
(
true
);
field
.
set
(
defaultMQAdminExt
,
defaultMQAdminExtImpl
);
}
@AfterClass
public
static
void
terminate
()
{
defaultMQAdminExt
.
shutdown
();
}
@Ignore
@Test
public
void
testExecute
()
throws
SubCommandException
{
System
.
setProperty
(
"rocketmq.namesrv.addr"
,
"127.0.0.1:9876"
);
GetConsumerConfigSubCommand
cmd
=
new
GetConsumerConfigSubCommand
();
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
String
[]
subargs
=
new
String
[]
{
"-g group_test"
};
final
CommandLine
commandLine
=
ServerUtil
.
parseCmdLine
(
"mqadmin "
+
cmd
.
commandName
(),
subargs
,
cmd
.
buildCommandlineOptions
(
options
),
new
PosixParser
());
cmd
.
execute
(
commandLine
,
options
,
null
);
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录