Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
a3954b45
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
a3954b45
编写于
9月 29, 2019
作者:
Z
zhangjidi
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
GetAccessConfigSubCommand for ACL configuration (#1263)
上级
3a624d7e
变更
16
显示空白变更内容
内联
并排
Showing
16 changed file
with
410 addition
and
0 deletion
+410
-0
acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
...rc/main/java/org/apache/rocketmq/acl/AccessValidator.java
+7
-0
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
...a/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
+4
-0
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
...org/apache/rocketmq/acl/plain/PlainPermissionManager.java
+23
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+26
-0
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
...java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+26
-0
common/src/main/java/org/apache/rocketmq/common/AclConfig.java
...n/src/main/java/org/apache/rocketmq/common/AclConfig.java
+43
-0
common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
...java/org/apache/rocketmq/common/protocol/RequestCode.java
+2
-0
common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerClusterAclConfigResponseBody.java
...rotocol/header/GetBrokerClusterAclConfigResponseBody.java
+46
-0
common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerClusterAclConfigResponseHeader.java
...tocol/header/GetBrokerClusterAclConfigResponseHeader.java
+43
-0
docs/cn/acl/user_guide.md
docs/cn/acl/user_guide.md
+12
-0
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
...va/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+6
-0
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+7
-0
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
...main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+4
-0
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
...ava/org/apache/rocketmq/tools/command/MQAdminStartup.java
+2
-0
tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java
...rocketmq/tools/command/acl/GetAccessConfigSubCommand.java
+123
-0
tools/src/test/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommandTest.java
...etmq/tools/command/acl/GetAccessConfigSubCommandTest.java
+36
-0
未找到文件。
acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
浏览文件 @
a3954b45
...
...
@@ -18,6 +18,7 @@
package
org.apache.rocketmq.acl
;
import
java.util.List
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
...
...
@@ -66,4 +67,10 @@ public interface AccessValidator {
* @return
*/
boolean
updateGlobalWhiteAddrsConfig
(
List
<
String
>
globalWhiteAddrsList
);
/**
* get broker cluster acl config information
* @return
*/
AclConfig
getAllAclConfig
();
}
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
浏览文件 @
a3954b45
...
...
@@ -26,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException;
import
org.apache.rocketmq.acl.common.AclUtils
;
import
org.apache.rocketmq.acl.common.Permission
;
import
org.apache.rocketmq.acl.common.SessionCredentials
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader
;
...
...
@@ -155,4 +156,7 @@ public class PlainAccessValidator implements AccessValidator {
return
aclPlugEngine
.
updateGlobalWhiteAddrsConfig
(
globalWhiteAddrsList
);
}
@Override
public
AclConfig
getAllAclConfig
()
{
return
aclPlugEngine
.
getAllAclConfig
();
}
}
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
浏览文件 @
a3954b45
...
...
@@ -30,6 +30,7 @@ import org.apache.rocketmq.acl.common.AclConstants;
import
org.apache.rocketmq.acl.common.AclException
;
import
org.apache.rocketmq.acl.common.AclUtils
;
import
org.apache.rocketmq.acl.common.Permission
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.DataVersion
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
...
...
@@ -270,6 +271,28 @@ public class PlainPermissionManager {
return
false
;
}
public
AclConfig
getAllAclConfig
()
{
AclConfig
aclConfig
=
new
AclConfig
();
List
<
PlainAccessConfig
>
configs
=
new
ArrayList
<>();
List
<
String
>
whiteAddrs
=
new
ArrayList
<>();
JSONObject
plainAclConfData
=
AclUtils
.
getYamlDataObject
(
fileHome
+
File
.
separator
+
fileName
,
JSONObject
.
class
);
if
(
plainAclConfData
==
null
||
plainAclConfData
.
isEmpty
())
{
throw
new
AclException
(
String
.
format
(
"%s file is not data"
,
fileHome
+
File
.
separator
+
fileName
));
}
JSONArray
globalWhiteAddrs
=
plainAclConfData
.
getJSONArray
(
AclConstants
.
CONFIG_GLOBAL_WHITE_ADDRS
);
if
(
globalWhiteAddrs
!=
null
&&
!
globalWhiteAddrs
.
isEmpty
())
{
whiteAddrs
=
globalWhiteAddrs
.
toJavaList
(
String
.
class
);
}
JSONArray
accounts
=
plainAclConfData
.
getJSONArray
(
AclConstants
.
CONFIG_ACCOUNTS
);
if
(
accounts
!=
null
&&
!
accounts
.
isEmpty
())
{
configs
=
accounts
.
toJavaList
(
PlainAccessConfig
.
class
);
}
aclConfig
.
setGlobalWhiteAddrs
(
whiteAddrs
);
aclConfig
.
setPlainAccessConfigs
(
configs
);
return
aclConfig
;
}
private
void
watch
()
{
try
{
String
watchFilePath
=
fileHome
+
fileName
;
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
a3954b45
...
...
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.MQVersion;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
import
org.apache.rocketmq.common.admin.OffsetWrapper
;
...
...
@@ -52,6 +53,8 @@ import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeade
import
org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
...
...
@@ -226,6 +229,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return
updateGlobalWhiteAddrsConfig
(
ctx
,
request
);
case
RequestCode
.
RESUME_CHECK_HALF_MESSAGE
:
return
resumeCheckHalfMessage
(
ctx
,
request
);
case
RequestCode
.
GET_BROKER_CLUSTER_ACL_CONFIG
:
return
getBrokerClusterAclConfig
(
ctx
,
request
);
default
:
break
;
}
...
...
@@ -428,6 +433,27 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return
null
;
}
private
RemotingCommand
getBrokerClusterAclConfig
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
GetBrokerClusterAclConfigResponseHeader
.
class
);
try
{
AccessValidator
accessValidator
=
this
.
brokerController
.
getAccessValidatorMap
().
get
(
PlainAccessValidator
.
class
);
GetBrokerClusterAclConfigResponseBody
body
=
new
GetBrokerClusterAclConfigResponseBody
();
AclConfig
aclConfig
=
accessValidator
.
getAllAclConfig
();
body
.
setGlobalWhiteAddrs
(
aclConfig
.
getGlobalWhiteAddrs
());
body
.
setPlainAccessConfigs
(
aclConfig
.
getPlainAccessConfigs
());
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setBody
(
body
.
encode
());
response
.
setRemark
(
null
);
return
response
;
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to generate a proper getBrokerClusterAclConfig response"
,
e
);
}
return
null
;
}
private
RemotingCommand
getAllTopicConfig
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
GetAllTopicConfigResponseHeader
.
class
);
// final GetAllTopicConfigResponseHeader responseHeader =
...
...
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
浏览文件 @
a3954b45
...
...
@@ -49,6 +49,7 @@ import org.apache.rocketmq.common.MQVersion;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
import
org.apache.rocketmq.common.admin.TopicStatsTable
;
...
...
@@ -96,6 +97,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequest
import
org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody
;
import
org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader
;
import
org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader
;
...
...
@@ -390,6 +392,30 @@ public class MQClientAPIImpl {
}
public
AclConfig
getBrokerClusterConfig
(
final
String
addr
,
final
long
timeoutMillis
)
throws
RemotingCommandException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQBrokerException
{
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
GET_BROKER_CLUSTER_ACL_CONFIG
,
null
);
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
MixAll
.
brokerVIPChannel
(
this
.
clientConfig
.
isVipChannelEnabled
(),
addr
),
request
,
timeoutMillis
);
assert
response
!=
null
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
SUCCESS
:
{
if
(
response
.
getBody
()
!=
null
)
{
GetBrokerClusterAclConfigResponseBody
body
=
GetBrokerClusterAclConfigResponseBody
.
decode
(
response
.
getBody
(),
GetBrokerClusterAclConfigResponseBody
.
class
);
AclConfig
aclConfig
=
new
AclConfig
();
aclConfig
.
setGlobalWhiteAddrs
(
body
.
getGlobalWhiteAddrs
());
aclConfig
.
setPlainAccessConfigs
(
body
.
getPlainAccessConfigs
());
return
aclConfig
;
}
}
default
:
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
SendResult
sendMessage
(
final
String
addr
,
final
String
brokerName
,
...
...
common/src/main/java/org/apache/rocketmq/common/AclConfig.java
0 → 100644
浏览文件 @
a3954b45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common
;
import
java.util.List
;
public
class
AclConfig
{
private
List
<
String
>
globalWhiteAddrs
;
private
List
<
PlainAccessConfig
>
plainAccessConfigs
;
public
List
<
String
>
getGlobalWhiteAddrs
()
{
return
globalWhiteAddrs
;
}
public
void
setGlobalWhiteAddrs
(
List
<
String
>
globalWhiteAddrs
)
{
this
.
globalWhiteAddrs
=
globalWhiteAddrs
;
}
public
List
<
PlainAccessConfig
>
getPlainAccessConfigs
()
{
return
plainAccessConfigs
;
}
public
void
setPlainAccessConfigs
(
List
<
PlainAccessConfig
>
plainAccessConfigs
)
{
this
.
plainAccessConfigs
=
plainAccessConfigs
;
}
}
common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
浏览文件 @
a3954b45
...
...
@@ -78,6 +78,8 @@ public class RequestCode {
public
static
final
int
UPDATE_GLOBAL_WHITE_ADDRS_CONFIG
=
53
;
public
static
final
int
GET_BROKER_CLUSTER_ACL_CONFIG
=
54
;
public
static
final
int
PUT_KV_CONFIG
=
100
;
public
static
final
int
GET_KV_CONFIG
=
101
;
...
...
common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerClusterAclConfigResponseBody.java
0 → 100644
浏览文件 @
a3954b45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.protocol.header
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
java.util.List
;
public
class
GetBrokerClusterAclConfigResponseBody
extends
RemotingSerializable
{
private
List
<
String
>
globalWhiteAddrs
;
private
List
<
PlainAccessConfig
>
plainAccessConfigs
;
public
List
<
String
>
getGlobalWhiteAddrs
()
{
return
globalWhiteAddrs
;
}
public
void
setGlobalWhiteAddrs
(
List
<
String
>
globalWhiteAddrs
)
{
this
.
globalWhiteAddrs
=
globalWhiteAddrs
;
}
public
List
<
PlainAccessConfig
>
getPlainAccessConfigs
()
{
return
plainAccessConfigs
;
}
public
void
setPlainAccessConfigs
(
List
<
PlainAccessConfig
>
plainAccessConfigs
)
{
this
.
plainAccessConfigs
=
plainAccessConfigs
;
}
}
common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerClusterAclConfigResponseHeader.java
0 → 100644
浏览文件 @
a3954b45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.protocol.header
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.remoting.CommandCustomHeader
;
import
org.apache.rocketmq.remoting.annotation.CFNotNull
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
java.util.List
;
public
class
GetBrokerClusterAclConfigResponseHeader
implements
CommandCustomHeader
{
@CFNotNull
private
List
<
PlainAccessConfig
>
plainAccessConfigs
;
@Override
public
void
checkFields
()
throws
RemotingCommandException
{
}
public
List
<
PlainAccessConfig
>
getPlainAccessConfigs
()
{
return
plainAccessConfigs
;
}
public
void
setPlainAccessConfigs
(
List
<
PlainAccessConfig
>
plainAccessConfigs
)
{
this
.
plainAccessConfigs
=
plainAccessConfigs
;
}
}
docs/cn/acl/user_guide.md
浏览文件 @
a3954b45
...
...
@@ -152,6 +152,18 @@ sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
### 7.5 查询集群/Broker的ACL配置文件全部内容
该命令的示例如下:
sh mqadmin getAccessConfigSubCommand -n 192.168.1.2:9876 -c DefaultCluster
说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
**特别注意**
开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;
\ No newline at end of file
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
浏览文件 @
a3954b45
...
...
@@ -25,6 +25,7 @@ import org.apache.rocketmq.client.ClientConfig;
import
org.apache.rocketmq.client.QueryResult
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
...
...
@@ -183,6 +184,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return
defaultMQAdminExtImpl
.
examineBrokerClusterAclVersionInfo
(
addr
);
}
@Override
public
AclConfig
examineBrokerClusterAclConfig
(
String
addr
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
return
defaultMQAdminExtImpl
.
examineBrokerClusterAclConfig
(
addr
);
}
@Override
public
void
createAndUpdateSubscriptionGroupConfig
(
String
addr
,
SubscriptionGroupConfig
config
)
throws
RemotingException
,
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
浏览文件 @
a3954b45
...
...
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.ServiceState
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
import
org.apache.rocketmq.common.admin.OffsetWrapper
;
...
...
@@ -202,6 +203,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
getBrokerClusterAclInfo
(
addr
,
timeoutMillis
);
}
@Override
public
AclConfig
examineBrokerClusterAclConfig
(
String
addr
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
getBrokerClusterConfig
(
addr
,
timeoutMillis
);
}
@Override
public
void
createAndUpdateSubscriptionGroupConfig
(
String
addr
,
SubscriptionGroupConfig
config
)
throws
RemotingException
,
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
浏览文件 @
a3954b45
...
...
@@ -24,6 +24,7 @@ import java.util.Set;
import
org.apache.rocketmq.client.MQAdmin
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
...
...
@@ -82,6 +83,9 @@ public interface MQAdminExt extends MQAdmin {
ClusterAclVersionInfo
examineBrokerClusterAclVersionInfo
(
final
String
addr
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
AclConfig
examineBrokerClusterAclConfig
(
final
String
addr
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
void
createAndUpdateSubscriptionGroupConfig
(
final
String
addr
,
final
SubscriptionGroupConfig
config
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
浏览文件 @
a3954b45
...
...
@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand
;
import
org.apache.rocketmq.tools.command.acl.GetAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand
;
...
...
@@ -209,6 +210,7 @@ public class MQAdminStartup {
initCommand
(
new
DeleteAccessConfigSubCommand
());
initCommand
(
new
ClusterAclConfigVersionListSubCommand
());
initCommand
(
new
UpdateGlobalWhiteAddrSubCommand
());
initCommand
(
new
GetAccessConfigSubCommand
());
}
private
static
void
initLogback
()
throws
JoranException
{
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java
0 → 100644
浏览文件 @
a3954b45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.acl
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.OptionGroup
;
import
org.apache.commons.cli.Options
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.command.CommandUtil
;
import
org.apache.rocketmq.tools.command.SubCommand
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
import
java.lang.reflect.Field
;
import
java.util.List
;
import
java.util.Set
;
public
class
GetAccessConfigSubCommand
implements
SubCommand
{
@Override
public
String
commandName
()
{
return
"getAccessConfigSubCommand"
;
}
@Override
public
String
commandDesc
()
{
return
"List all of acl config information in cluster"
;
}
@Override
public
Options
buildCommandlineOptions
(
Options
options
)
{
OptionGroup
optionGroup
=
new
OptionGroup
();
Option
opt
=
new
Option
(
"b"
,
"brokerAddr"
,
true
,
"query acl config version for which broker"
);
optionGroup
.
addOption
(
opt
);
opt
=
new
Option
(
"c"
,
"clusterName"
,
true
,
"query acl config version for specified cluster"
);
optionGroup
.
addOption
(
opt
);
optionGroup
.
setRequired
(
true
);
options
.
addOptionGroup
(
optionGroup
);
return
options
;
}
@Override
public
void
execute
(
CommandLine
commandLine
,
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
DefaultMQAdminExt
defaultMQAdminExt
=
new
DefaultMQAdminExt
(
rpcHook
);
defaultMQAdminExt
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
try
{
if
(
commandLine
.
hasOption
(
'b'
))
{
String
addr
=
commandLine
.
getOptionValue
(
'b'
).
trim
();
defaultMQAdminExt
.
start
();
printClusterBaseInfo
(
defaultMQAdminExt
,
addr
);
return
;
}
else
if
(
commandLine
.
hasOption
(
'c'
))
{
String
clusterName
=
commandLine
.
getOptionValue
(
'c'
).
trim
();
defaultMQAdminExt
.
start
();
Set
<
String
>
masterSet
=
CommandUtil
.
fetchMasterAddrByClusterName
(
defaultMQAdminExt
,
clusterName
);
for
(
String
addr
:
masterSet
)
{
printClusterBaseInfo
(
defaultMQAdminExt
,
addr
);
}
return
;
}
ServerUtil
.
printCommandLineHelp
(
"mqadmin "
+
this
.
commandName
(),
options
);
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
" command failed"
,
e
);
}
finally
{
defaultMQAdminExt
.
shutdown
();
}
}
private
void
printClusterBaseInfo
(
final
DefaultMQAdminExt
defaultMQAdminExt
,
final
String
addr
)
throws
InterruptedException
,
MQBrokerException
,
RemotingException
,
MQClientException
,
IllegalAccessException
{
AclConfig
aclConfig
=
defaultMQAdminExt
.
examineBrokerClusterAclConfig
(
addr
);
List
<
PlainAccessConfig
>
configs
=
aclConfig
.
getPlainAccessConfigs
();
List
<
String
>
globalWhiteAddrs
=
aclConfig
.
getGlobalWhiteAddrs
();
System
.
out
.
printf
(
"\n"
);
System
.
out
.
printf
(
"%-20s: %s\n"
,
"globalWhiteRemoteAddresses"
,
globalWhiteAddrs
.
toString
());
System
.
out
.
printf
(
"\n"
);
System
.
out
.
printf
(
"accounts:\n"
);
if
(
configs
!=
null
&&
configs
.
size
()
>
0
)
{
for
(
PlainAccessConfig
config
:
configs
)
{
Field
[]
fields
=
config
.
getClass
().
getDeclaredFields
();
for
(
Field
field
:
fields
)
{
field
.
setAccessible
(
true
);
if
(
field
.
get
(
config
)
!=
null
)
{
System
.
out
.
printf
(
"%-1s %-18s: %s\n"
,
""
,
field
.
getName
(),
field
.
get
(
config
).
toString
());
}
else
{
System
.
out
.
printf
(
"%-1s %-18s: %s\n"
,
""
,
field
.
getName
(),
""
);
}
}
System
.
out
.
printf
(
"\n"
);
}
}
}
}
tools/src/test/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommandTest.java
0 → 100644
浏览文件 @
a3954b45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.acl
;
import
org.apache.commons.cli.*
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.junit.Test
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
public
class
GetAccessConfigSubCommandTest
{
@Test
public
void
testExecute
()
{
GetAccessConfigSubCommand
cmd
=
new
GetAccessConfigSubCommand
();
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
String
[]
subargs
=
new
String
[]
{
"-c default-cluster"
};
final
CommandLine
commandLine
=
ServerUtil
.
parseCmdLine
(
"mqadmin "
+
cmd
.
commandName
(),
subargs
,
cmd
.
buildCommandlineOptions
(
options
),
new
PosixParser
());
assertThat
(
commandLine
.
getOptionValue
(
'c'
).
trim
()).
isEqualTo
(
"default-cluster"
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录