未验证 提交 d99862a8 编写于 作者: J jonnxu 提交者: GitHub

[ISSUE #1263] GetAccessConfigSubCommand for ACL configuration

...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.rocketmq.acl; package org.apache.rocketmq.acl;
import java.util.List; import java.util.List;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -66,4 +67,10 @@ public interface AccessValidator { ...@@ -66,4 +67,10 @@ public interface AccessValidator {
* @return * @return
*/ */
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList); boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);
/**
* get broker cluster acl config information
* @return
*/
AclConfig getAllAclConfig();
} }
...@@ -26,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException; ...@@ -26,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
...@@ -155,4 +156,7 @@ public class PlainAccessValidator implements AccessValidator { ...@@ -155,4 +156,7 @@ public class PlainAccessValidator implements AccessValidator {
return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList); return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
} }
@Override public AclConfig getAllAclConfig() {
return aclPlugEngine.getAllAclConfig();
}
} }
...@@ -30,6 +30,7 @@ import org.apache.rocketmq.acl.common.AclConstants; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
...@@ -270,6 +271,28 @@ public class PlainPermissionManager { ...@@ -270,6 +271,28 @@ public class PlainPermissionManager {
return false; return false;
} }
public AclConfig getAllAclConfig() {
AclConfig aclConfig = new AclConfig();
List<PlainAccessConfig> configs = new ArrayList<>();
List<String> whiteAddrs = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
JSONArray globalWhiteAddrs = plainAclConfData.getJSONArray(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
if (globalWhiteAddrs != null && !globalWhiteAddrs.isEmpty()) {
whiteAddrs = globalWhiteAddrs.toJavaList(String.class);
}
JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) {
configs = accounts.toJavaList(PlainAccessConfig.class);
}
aclConfig.setGlobalWhiteAddrs(whiteAddrs);
aclConfig.setPlainAccessConfigs(configs);
return aclConfig;
}
private void watch() { private void watch() {
try { try {
String watchFilePath = fileHome + fileName; String watchFilePath = fileHome + fileName;
......
...@@ -29,6 +29,7 @@ import org.apache.rocketmq.acl.common.AclConstants; ...@@ -29,6 +29,7 @@ import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.*; import org.apache.rocketmq.common.protocol.header.*;
...@@ -561,4 +562,12 @@ public class PlainAccessValidatorTest { ...@@ -561,4 +562,12 @@ public class PlainAccessValidatorTest {
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap); AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
} }
@Test
public void getAllAclConfigTest(){
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
AclConfig aclConfig = plainAccessValidator.getAllAclConfig();
Assert.assertEquals(aclConfig.getGlobalWhiteAddrs().size(), 2);
Assert.assertEquals(aclConfig.getPlainAccessConfigs().size(), 2);
}
} }
...@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.MQVersion; ...@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.OffsetWrapper;
...@@ -52,6 +53,8 @@ import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeade ...@@ -52,6 +53,8 @@ import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeade
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
...@@ -226,6 +229,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -226,6 +229,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return updateGlobalWhiteAddrsConfig(ctx, request); return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE: case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request); return resumeCheckHalfMessage(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
default: default:
break; break;
} }
...@@ -428,6 +433,27 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -428,6 +433,27 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return null; return null;
} }
private RemotingCommand getBrokerClusterAclConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);
try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
AclConfig aclConfig = accessValidator.getAllAclConfig();
body.setGlobalWhiteAddrs(aclConfig.getGlobalWhiteAddrs());
body.setPlainAccessConfigs(aclConfig.getPlainAccessConfigs());
response.setCode(ResponseCode.SUCCESS);
response.setBody(body.encode());
response.setRemark(null);
return response;
} catch (Exception e) {
log.error("Failed to generate a proper getBrokerClusterAclConfig response", e);
}
return null;
}
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
// final GetAllTopicConfigResponseHeader responseHeader = // final GetAllTopicConfigResponseHeader responseHeader =
......
...@@ -49,6 +49,7 @@ import org.apache.rocketmq.common.MQVersion; ...@@ -49,6 +49,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
...@@ -96,6 +97,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequest ...@@ -96,6 +97,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequest
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
...@@ -390,6 +392,30 @@ public class MQClientAPIImpl { ...@@ -390,6 +392,30 @@ public class MQClientAPIImpl {
} }
public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
return aclConfig;
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public SendResult sendMessage( public SendResult sendMessage(
final String addr, final String addr,
final String brokerName, final String brokerName,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import java.util.List;
public class AclConfig {
private List<String> globalWhiteAddrs;
private List<PlainAccessConfig> plainAccessConfigs;
public List<String> getGlobalWhiteAddrs() {
return globalWhiteAddrs;
}
public void setGlobalWhiteAddrs(List<String> globalWhiteAddrs) {
this.globalWhiteAddrs = globalWhiteAddrs;
}
public List<PlainAccessConfig> getPlainAccessConfigs() {
return plainAccessConfigs;
}
public void setPlainAccessConfigs(List<PlainAccessConfig> plainAccessConfigs) {
this.plainAccessConfigs = plainAccessConfigs;
}
}
...@@ -78,6 +78,8 @@ public class RequestCode { ...@@ -78,6 +78,8 @@ public class RequestCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53; public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
public static final int PUT_KV_CONFIG = 100; public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101; public static final int GET_KV_CONFIG = 101;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.List;
public class GetBrokerClusterAclConfigResponseBody extends RemotingSerializable {
private List<String> globalWhiteAddrs;
private List<PlainAccessConfig> plainAccessConfigs;
public List<String> getGlobalWhiteAddrs() {
return globalWhiteAddrs;
}
public void setGlobalWhiteAddrs(List<String> globalWhiteAddrs) {
this.globalWhiteAddrs = globalWhiteAddrs;
}
public List<PlainAccessConfig> getPlainAccessConfigs() {
return plainAccessConfigs;
}
public void setPlainAccessConfigs(List<PlainAccessConfig> plainAccessConfigs) {
this.plainAccessConfigs = plainAccessConfigs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import java.util.List;
public class GetBrokerClusterAclConfigResponseHeader implements CommandCustomHeader {
@CFNotNull
private List<PlainAccessConfig> plainAccessConfigs;
@Override
public void checkFields() throws RemotingCommandException {
}
public List<PlainAccessConfig> getPlainAccessConfigs() {
return plainAccessConfigs;
}
public void setPlainAccessConfigs(List<PlainAccessConfig> plainAccessConfigs) {
this.plainAccessConfigs = plainAccessConfigs;
}
}
...@@ -152,6 +152,18 @@ sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster ...@@ -152,6 +152,18 @@ sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) | | c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) | | b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
### 7.5 查询集群/Broker的ACL配置文件全部内容
该命令的示例如下:
sh mqadmin getAccessConfigSubCommand -n 192.168.1.2:9876 -c DefaultCluster
说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
**特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题, **特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149; 在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;
\ No newline at end of file
...@@ -25,6 +25,7 @@ import org.apache.rocketmq.client.ClientConfig; ...@@ -25,6 +25,7 @@ import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
...@@ -183,6 +184,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -183,6 +184,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr); return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr);
} }
@Override public AclConfig examineBrokerClusterAclConfig(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.examineBrokerClusterAclConfig(addr);
}
@Override @Override
public void createAndUpdateSubscriptionGroupConfig(String addr, public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException, SubscriptionGroupConfig config) throws RemotingException,
......
...@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.OffsetWrapper;
...@@ -202,6 +203,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -202,6 +203,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis); return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis);
} }
@Override
public AclConfig examineBrokerClusterAclConfig(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterConfig(addr, timeoutMillis);
}
@Override @Override
public void createAndUpdateSubscriptionGroupConfig(String addr, public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException, SubscriptionGroupConfig config) throws RemotingException,
......
...@@ -24,6 +24,7 @@ import java.util.Set; ...@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
...@@ -82,6 +83,9 @@ public interface MQAdminExt extends MQAdmin { ...@@ -82,6 +83,9 @@ public interface MQAdminExt extends MQAdmin {
ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(final String addr) throws RemotingException, MQBrokerException, ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(final String addr) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException; InterruptedException, MQClientException;
AclConfig examineBrokerClusterAclConfig(final String addr) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void createAndUpdateSubscriptionGroupConfig(final String addr, void createAndUpdateSubscriptionGroupConfig(final String addr,
final SubscriptionGroupConfig config) throws RemotingException, final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException; MQBrokerException, InterruptedException, MQClientException;
......
...@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.RPCHook; ...@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand; import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand;
import org.apache.rocketmq.tools.command.acl.GetAccessConfigSubCommand;
import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand; import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand;
import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand; import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand;
import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand; import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand;
...@@ -209,6 +210,7 @@ public class MQAdminStartup { ...@@ -209,6 +210,7 @@ public class MQAdminStartup {
initCommand(new DeleteAccessConfigSubCommand()); initCommand(new DeleteAccessConfigSubCommand());
initCommand(new ClusterAclConfigVersionListSubCommand()); initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand()); initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());
} }
private static void initLogback() throws JoranException { private static void initLogback() throws JoranException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Set;
public class GetAccessConfigSubCommand implements SubCommand {
@Override public String commandName() {
return "getAccessConfigSubCommand";
}
@Override public String commandDesc() {
return "List all of acl config information in cluster";
}
@Override public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "query acl config version for which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "query acl config version for specified cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
printClusterBaseInfo(defaultMQAdminExt, addr);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
printClusterBaseInfo(defaultMQAdminExt, addr);
}
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private void printClusterBaseInfo(
final DefaultMQAdminExt defaultMQAdminExt, final String addr) throws
InterruptedException, MQBrokerException, RemotingException, MQClientException, IllegalAccessException {
AclConfig aclConfig = defaultMQAdminExt.examineBrokerClusterAclConfig(addr);
List<PlainAccessConfig> configs = aclConfig.getPlainAccessConfigs();
List<String> globalWhiteAddrs = aclConfig.getGlobalWhiteAddrs();
System.out.printf("\n");
System.out.printf("%-20s: %s\n", "globalWhiteRemoteAddresses", globalWhiteAddrs.toString());
System.out.printf("\n");
System.out.printf("accounts:\n");
if (configs != null && configs.size() > 0) {
for (PlainAccessConfig config : configs) {
Field[] fields = config.getClass().getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
if (field.get(config) != null) {
System.out.printf("%-1s %-18s: %s\n", "", field.getName(), field.get(config).toString());
} else {
System.out.printf("%-1s %-18s: %s\n", "", field.getName(), "");
}
}
System.out.printf("\n");
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.*;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class GetAccessConfigSubCommandTest {
@Test
public void testExecute() {
GetAccessConfigSubCommand cmd = new GetAccessConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册