diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plug/BorkerAccessControl.java b/acl/src/main/java/org/apache/rocketmq/acl/plug/BrokerAccessControl.java similarity index 97% rename from acl/src/main/java/org/apache/rocketmq/acl/plug/BorkerAccessControl.java rename to acl/src/main/java/org/apache/rocketmq/acl/plug/BrokerAccessControl.java index 449c8d01dc88c4ce0808b89f3d7f25fcac6cc783..beb8539c0987accb1f4e9a79a6191e7eaf52d805 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plug/BorkerAccessControl.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plug/BrokerAccessControl.java @@ -19,7 +19,9 @@ package org.apache.rocketmq.acl.plug; import java.util.HashSet; import java.util.Set; -public class BorkerAccessControl extends AccessControl { +public class BrokerAccessControl extends AccessControl { + + private boolean admin; private Set permitSendTopic = new HashSet<>(); private Set noPermitSendTopic = new HashSet<>(); @@ -54,13 +56,13 @@ public class BorkerAccessControl extends AccessControl { private boolean endTransaction = true; - private boolean updateAndCreateTopic = true; + private boolean updateAndCreateTopic = false; - private boolean deleteTopicInbroker = true; + private boolean deleteTopicInbroker = false; private boolean getAllTopicConfig = true; - private boolean updateBrokerConfig = true; + private boolean updateBrokerConfig = false; private boolean getBrokerConfig = true; @@ -78,11 +80,11 @@ public class BorkerAccessControl extends AccessControl { private boolean unlockBatchMQ = true; - private boolean updateAndCreateSubscriptiongroup = true; + private boolean updateAndCreateSubscriptiongroup = false; private boolean getAllSubscriptiongroupConfig = true; - private boolean deleteSubscriptiongroup = true; + private boolean deleteSubscriptiongroup = false; private boolean getTopicStatsInfo = true; @@ -124,8 +126,16 @@ public class BorkerAccessControl extends AccessControl { private boolean queryConsumeQueue = true; - public BorkerAccessControl() { + public BrokerAccessControl() { + + } + + public boolean isAdmin() { + return admin; + } + public void setAdmin(boolean admin) { + this.admin = admin; } public Set getPermitSendTopic() { diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java b/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java index 580595ca4e23384c457a6d36e928647db5c2816e..c8fb4c5009a96f1b4fd408b15b2a34861eefd417 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java @@ -62,13 +62,13 @@ public class PlainAclPlugEngine { } public void initialize() { - BorkerAccessControlTransport accessControlTransport = AclUtils.getYamlDataObject(fileHome + "/conf/transport.yml", BorkerAccessControlTransport.class); + BrokerAccessControlTransport accessControlTransport = AclUtils.getYamlDataObject(fileHome + "/conf/transport.yml", BrokerAccessControlTransport.class); if (accessControlTransport == null) { throw new AclPlugRuntimeException("transport.yml file is no data"); } log.info("BorkerAccessControlTransport data is : ", accessControlTransport.toString()); accessContralAnalysis.analysisClass(accessContralAnalysisClass); - setBorkerAccessControlTransport(accessControlTransport); + setBrokerAccessControlTransport(accessControlTransport); } private void watch() { @@ -188,7 +188,7 @@ public class PlainAclPlugEngine { return authenticationResult; } - void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) { + void setBrokerAccessControlTransport(BrokerAccessControlTransport transport) { if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) { throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty"); } @@ -197,7 +197,14 @@ public class PlainAclPlugEngine { this.setNetaddressAccessControl(transport.getOnlyNetAddress()); } if (transport.getList() != null || transport.getList().size() > 0) { - for (AccessControl accessControl : transport.getList()) { + for (BrokerAccessControl accessControl : transport.getList()) { + if (accessControl.isAdmin()) { + accessControl.setUpdateAndCreateSubscriptiongroup(true); + accessControl.setDeleteSubscriptiongroup(true); + accessControl.setUpdateAndCreateTopic(true); + accessControl.setDeleteTopicInbroker(true); + accessControl.setUpdateBrokerConfig(true); + } this.setAccessControl(accessControl); } } @@ -210,10 +217,10 @@ public class PlainAclPlugEngine { authenticationResult.setResultString(String.format("code is %d Authentication failed", code)); return false; } - if (!(authenticationInfo.getAccessControl() instanceof BorkerAccessControl)) { + if (!(authenticationInfo.getAccessControl() instanceof BrokerAccessControl)) { return true; } - BorkerAccessControl borker = (BorkerAccessControl) authenticationInfo.getAccessControl(); + BrokerAccessControl borker = (BrokerAccessControl) authenticationInfo.getAccessControl(); String topicName = accessControl.getTopic(); if (code == 10 || code == 310 || code == 320) { if (borker.getPermitSendTopic().contains(topicName)) { @@ -264,6 +271,8 @@ public class PlainAclPlugEngine { codeAndField = new HashMap<>(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { + if ("admin".equals(field.getName())) + continue; if (!field.getType().equals(boolean.class)) continue; Integer code = fieldNameAndCode.get(field.getName().toLowerCase()); @@ -297,25 +306,25 @@ public class PlainAclPlugEngine { } - public static class BorkerAccessControlTransport { + public static class BrokerAccessControlTransport { - private BorkerAccessControl onlyNetAddress; + private BrokerAccessControl onlyNetAddress; - private List list; + private List list; - public BorkerAccessControl getOnlyNetAddress() { + public BrokerAccessControl getOnlyNetAddress() { return onlyNetAddress; } - public void setOnlyNetAddress(BorkerAccessControl onlyNetAddress) { + public void setOnlyNetAddress(BrokerAccessControl onlyNetAddress) { this.onlyNetAddress = onlyNetAddress; } - public List getList() { + public List getList() { return list; } - public void setList(List list) { + public void setList(List list) { this.list = list; } diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java index 654cf423ae5fc9333e8ed1fda663505cd208bfd9..8797c4959301bc758068d64ee4b8a5bc1029e19b 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.rocketmq.acl.plug.PlainAclPlugEngine.AccessContralAnalysis; -import org.apache.rocketmq.acl.plug.PlainAclPlugEngine.BorkerAccessControlTransport; +import org.apache.rocketmq.acl.plug.PlainAclPlugEngine.BrokerAccessControlTransport; import org.apache.rocketmq.common.protocol.RequestCode; import org.junit.Assert; import org.junit.Before; @@ -46,50 +46,49 @@ public class PlainAclPlugEngineTest { AuthenticationInfo authenticationInfo; - BorkerAccessControl borkerAccessControl; + BrokerAccessControl BrokerAccessControl; @Before public void init() throws NoSuchFieldException, SecurityException, IOException { accessContralAnalysis.analysisClass(RequestCode.class); - borkerAccessControl = new BorkerAccessControl(); + BrokerAccessControl = new BrokerAccessControl(); // 321 - borkerAccessControl.setQueryConsumeQueue(false); + BrokerAccessControl.setQueryConsumeQueue(false); Set permitSendTopic = new HashSet<>(); permitSendTopic.add("permitSendTopic"); - borkerAccessControl.setPermitSendTopic(permitSendTopic); + BrokerAccessControl.setPermitSendTopic(permitSendTopic); Set noPermitSendTopic = new HashSet<>(); noPermitSendTopic.add("noPermitSendTopic"); - borkerAccessControl.setNoPermitSendTopic(noPermitSendTopic); + BrokerAccessControl.setNoPermitSendTopic(noPermitSendTopic); Set permitPullTopic = new HashSet<>(); permitPullTopic.add("permitPullTopic"); - borkerAccessControl.setPermitPullTopic(permitPullTopic); + BrokerAccessControl.setPermitPullTopic(permitPullTopic); Set noPermitPullTopic = new HashSet<>(); noPermitPullTopic.add("noPermitPullTopic"); - borkerAccessControl.setNoPermitPullTopic(noPermitPullTopic); + BrokerAccessControl.setNoPermitPullTopic(noPermitPullTopic); AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); accessContralAnalysis.analysisClass(RequestCode.class); - Map map = accessContralAnalysis.analysis(borkerAccessControl); + Map map = accessContralAnalysis.analysis(BrokerAccessControl); - authenticationInfo = new AuthenticationInfo(map, borkerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY); + authenticationInfo = new AuthenticationInfo(map, BrokerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY); System.setProperty("rocketmq.home.dir", "src/test/resources"); plainAclPlugEngine = new PlainAclPlugEngine(); - plainAclPlugEngine.initialize(); - accessControl = new BorkerAccessControl(); + accessControl = new BrokerAccessControl(); accessControl.setAccount("rokcetmq"); accessControl.setPassword("aliyun11"); accessControl.setNetaddress("127.0.0.1"); accessControl.setRecognition("127.0.0.1:1"); - accessControlTwo = new BorkerAccessControl(); + accessControlTwo = new BrokerAccessControl(); accessControlTwo.setAccount("rokcet1"); accessControlTwo.setPassword("aliyun1"); accessControlTwo.setNetaddress("127.0.0.1"); @@ -175,7 +174,7 @@ public class PlainAclPlugEngineTest { @Test public void setNetaddressAccessControl() { - AccessControl accessControl = new BorkerAccessControl(); + AccessControl accessControl = new BrokerAccessControl(); accessControl.setAccount("RocketMQ"); accessControl.setPassword("RocketMQ"); accessControl.setNetaddress("127.0.0.1"); @@ -197,21 +196,21 @@ public class PlainAclPlugEngineTest { } @Test(expected = AclPlugRuntimeException.class) - public void borkerAccessControlTransportTestNull() { - BorkerAccessControlTransport accessControlTransport = new BorkerAccessControlTransport(); - plainAclPlugEngine.setBorkerAccessControlTransport(accessControlTransport); + public void BrokerAccessControlTransportTestNull() { + BrokerAccessControlTransport accessControlTransport = new BrokerAccessControlTransport(); + plainAclPlugEngine.setBrokerAccessControlTransport(accessControlTransport); } @Test - public void borkerAccessControlTransportTest() { - BorkerAccessControlTransport accessControlTransport = new BorkerAccessControlTransport(); - List list = new ArrayList<>(); - list.add((BorkerAccessControl) this.accessControlTwo); - accessControlTransport.setOnlyNetAddress((BorkerAccessControl) this.accessControl); + public void BrokerAccessControlTransportTest() { + BrokerAccessControlTransport accessControlTransport = new BrokerAccessControlTransport(); + List list = new ArrayList<>(); + list.add((BrokerAccessControl) this.accessControlTwo); + accessControlTransport.setOnlyNetAddress((BrokerAccessControl) this.accessControl); accessControlTransport.setList(list); - plainAclPlugEngine.setBorkerAccessControlTransport(accessControlTransport); + plainAclPlugEngine.setBrokerAccessControlTransport(accessControlTransport); - AccessControl accessControl = new BorkerAccessControl(); + AccessControl accessControl = new BrokerAccessControl(); accessControl.setAccount("RocketMQ"); accessControl.setPassword("RocketMQ"); accessControl.setNetaddress("127.0.0.1"); @@ -281,7 +280,7 @@ public class PlainAclPlugEngineTest { Assert.assertFalse(isReturn); Set permitSendTopic = new HashSet<>(); - borkerAccessControl.setPermitSendTopic(permitSendTopic); + BrokerAccessControl.setPermitSendTopic(permitSendTopic); isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult); Assert.assertTrue(isReturn); @@ -289,14 +288,14 @@ public class PlainAclPlugEngineTest { isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult); Assert.assertFalse(isReturn); - borkerAccessControl.setPermitPullTopic(permitSendTopic); + BrokerAccessControl.setPermitPullTopic(permitSendTopic); isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult); Assert.assertTrue(isReturn); } @Test public void analysisTest() { - BorkerAccessControl accessControl = new BorkerAccessControl(); + BrokerAccessControl accessControl = new BrokerAccessControl(); accessControl.setSendMessage(false); Map map = accessContralAnalysis.analysis(accessControl); diff --git a/acl/src/test/resources/conf/transport.yml b/acl/src/test/resources/conf/transport.yml index 99d26fd8eb4427b7842ad75e7b84490baf7a4cf9..6b1450ef952c181abd03cb4c3840b3647271a4ae 100644 --- a/acl/src/test/resources/conf/transport.yml +++ b/acl/src/test/resources/conf/transport.yml @@ -22,6 +22,7 @@ list: - account: RocketMQ password: 1234567 netaddress: 192.0.0.* + admin: true permitSendTopic: - test1 - test2 diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java index 8d4f23026d3621381cf2f816b58b480d00ab7ec6..ea8047a2ff12f077e59797587e7da6b9c79c949b 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java @@ -35,17 +35,7 @@ public class ServerUtil { new Option("n", "namesrvAddr", true, "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); opt.setRequired(false); - options.addOption(opt); - - - opt = new Option("account", "account", true, "acl want the parameters"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("password", "password", true, "acl want the parameters"); - opt.setRequired(false); - options.addOption(opt); - + options.addOption(opt); return options; } diff --git a/tools/pom.xml b/tools/pom.xml index dc0e256ed462186bf01bee17bd38d89be1e70a90..086c3e64c13ec767571e36b73747064726e1a2e5 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -15,7 +15,8 @@ limitations under the License. --> - + org.apache.rocketmq rocketmq-all @@ -60,5 +61,9 @@ org.apache.commons commons-lang3 + + org.yaml + snakeyaml + diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index cf73f65c2500ced3d50cee6abc217b6779f01334..d1ce0f0ab7ebf1645bd69ab914c3ad3aab94492c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -19,16 +19,13 @@ package org.apache.rocketmq.tools.command; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; -import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; @@ -82,6 +79,7 @@ import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; public class MQAdminStartup { protected static List subCommandList = new ArrayList(); @@ -218,7 +216,6 @@ public class MQAdminStartup { private static void printHelp() { System.out.printf("The most commonly used mqadmin commands are:%n"); - System.out.printf("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command"); for (SubCommand cmd : subCommandList) { System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc()); } @@ -252,62 +249,63 @@ public class MQAdminStartup { } public static RPCHook getAclRPCHook(CommandLine commandLine) { - String account = null, password = null; - if (commandLine.hasOption("account")) { - account = commandLine.getOptionValue("account"); - password = commandLine.getOptionValue("password"); - } else { - String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); - File file = new File(fileHome + "/conf/tools.properties"); - if (!file.exists()) { - System.out.printf("no find tools.properties , , Execution may fail without account andd password"); - System.out.printf("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command"); - return null; - } - InputStream in = null; - try { - in = new BufferedInputStream(new FileInputStream(file)); - Properties properties = new Properties(); - properties.load(in); - account = properties.getProperty("account"); - password = properties.getProperty("password"); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - e.printStackTrace(); - } + String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + File file = new File(fileHome + "/conf/tools.yml"); + if (!file.exists()) { + System.out.printf("file %s is not exist" , file.getPath()); + return null; + } + Yaml ymal = new Yaml(); + FileInputStream fis = null; + Map> map = null; + try { + fis = new FileInputStream(file); + map = ymal.loadAs(fis, Map.class); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (fis != null) { + try { + fis.close(); + } catch (IOException e) { + e.printStackTrace(); } } } - if (StringUtils.isNotBlank(account) && StringUtils.isNotBlank(password)) { - final String newAccount = account; - final String newPassword = password; - return new RPCHook() { + if (map == null || map.isEmpty()) { + System.out.printf("file %s is no data" , file.getPath()); + return null; + } - @Override - public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + final Map> newMap = map; + return new RPCHook() { + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + System.out.printf("remoteAddr is %s code %d \n" , remoteAddr , request.getCode() ); + String fastRemoteAddr = null; + if(remoteAddr != null) { + String[] ipAndPost = StringUtils.split(remoteAddr, ":"); + Integer fastPost = (Integer.valueOf(ipAndPost[1])+2); + fastRemoteAddr = ipAndPost[0] + ":" + fastPost.toString(); + } + Map map; + if ((map = newMap.get(remoteAddr)) != null ||(map = newMap.get(fastRemoteAddr)) != null || (map = newMap.get("all")) != null) { HashMap ext = request.getExtFields(); if (ext == null) { ext = new HashMap<>(); request.setExtFields(ext); } - ext.put("account", newAccount); - ext.put("password", newPassword); + ext.put("account", map.get("account").toString()); + ext.put("password", map.get("password").toString()); } + + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + } + }; - @Override - public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { - } - }; - } - System.out.printf("account andd password data incorrectness , Execution may fail without account andd password"); - System.out.printf("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command"); - return null; } }