diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java b/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java index c8fb4c5009a96f1b4fd408b15b2a34861eefd417..50aab379d32eb673217942ec5f8823e2c914334b 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java @@ -56,6 +56,8 @@ public class PlainAclPlugEngine { private Class accessContralAnalysisClass = RequestCode.class; + private boolean isWatchStart; + public PlainAclPlugEngine() { initialize(); watch(); @@ -95,6 +97,7 @@ public class PlainAclPlugEngine { if ("transport.yml".equals(event.context().toString()) && (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) { log.info("transprot.yml make a difference change is : ", event.toString()); + PlainAclPlugEngine.this.cleanAuthenticationInfo(); initialize(); } } @@ -114,11 +117,30 @@ public class PlainAclPlugEngine { }; watcherServcie.start(); log.info("succeed start watcherServcie"); + this.isWatchStart = true; } catch (IOException e) { log.error(e.getMessage(), e); } } + private void handleAccessControl(AccessControl accessControl) { + if (accessControl instanceof BrokerAccessControl) { + BrokerAccessControl brokerAccessControl = (BrokerAccessControl) accessControl; + if (brokerAccessControl.isAdmin()) { + brokerAccessControl.setUpdateAndCreateSubscriptiongroup(true); + brokerAccessControl.setDeleteSubscriptiongroup(true); + brokerAccessControl.setUpdateAndCreateTopic(true); + brokerAccessControl.setDeleteTopicInbroker(true); + brokerAccessControl.setUpdateBrokerConfig(true); + } + } + } + + void cleanAuthenticationInfo() { + accessControlMap.clear(); + authenticationInfo = null; + } + public void setAccessControl(AccessControl accessControl) throws AclPlugRuntimeException { if (accessControl.getAccount() == null || accessControl.getPassword() == null || accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) { @@ -127,6 +149,7 @@ public class PlainAclPlugEngine { accessControl.getAccount(), accessControl.getPassword())); } try { + handleAccessControl(accessControl); NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl); List accessControlAddressList = accessControlMap.get(accessControl.getAccount()); if (accessControlAddressList == null) { @@ -198,13 +221,6 @@ public class PlainAclPlugEngine { } if (transport.getList() != null || transport.getList().size() > 0) { for (BrokerAccessControl accessControl : transport.getList()) { - if (accessControl.isAdmin()) { - accessControl.setUpdateAndCreateSubscriptiongroup(true); - accessControl.setDeleteSubscriptiongroup(true); - accessControl.setUpdateAndCreateTopic(true); - accessControl.setDeleteTopicInbroker(true); - accessControl.setUpdateBrokerConfig(true); - } this.setAccessControl(accessControl); } } @@ -244,6 +260,10 @@ public class PlainAclPlugEngine { return true; } + public boolean isWatchStart() { + return isWatchStart; + } + public static class AccessContralAnalysis { private Map, Map> classTocodeAndMentod = new HashMap<>(); diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java index 8797c4959301bc758068d64ee4b8a5bc1029e19b..3d3f254e35d6e0eab162d19159286bcb215963b0 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngineTest.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.acl.plug; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -46,38 +48,50 @@ public class PlainAclPlugEngineTest { AuthenticationInfo authenticationInfo; - BrokerAccessControl BrokerAccessControl; + BrokerAccessControl brokerAccessControl; + + Set adminCode = new HashSet<>(); @Before public void init() throws NoSuchFieldException, SecurityException, IOException { + // UPDATE_AND_CREATE_TOPIC + adminCode.add(17); + // UPDATE_BROKER_CONFIG + adminCode.add(25); + // DELETE_TOPIC_IN_BROKER + adminCode.add(215); + // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP + adminCode.add(200); + // DELETE_SUBSCRIPTIONGROUP + adminCode.add(207); accessContralAnalysis.analysisClass(RequestCode.class); - BrokerAccessControl = new BrokerAccessControl(); + brokerAccessControl = new BrokerAccessControl(); // 321 - BrokerAccessControl.setQueryConsumeQueue(false); + brokerAccessControl.setQueryConsumeQueue(false); Set permitSendTopic = new HashSet<>(); permitSendTopic.add("permitSendTopic"); - BrokerAccessControl.setPermitSendTopic(permitSendTopic); + brokerAccessControl.setPermitSendTopic(permitSendTopic); Set noPermitSendTopic = new HashSet<>(); noPermitSendTopic.add("noPermitSendTopic"); - BrokerAccessControl.setNoPermitSendTopic(noPermitSendTopic); + brokerAccessControl.setNoPermitSendTopic(noPermitSendTopic); Set permitPullTopic = new HashSet<>(); permitPullTopic.add("permitPullTopic"); - BrokerAccessControl.setPermitPullTopic(permitPullTopic); + brokerAccessControl.setPermitPullTopic(permitPullTopic); Set noPermitPullTopic = new HashSet<>(); noPermitPullTopic.add("noPermitPullTopic"); - BrokerAccessControl.setNoPermitPullTopic(noPermitPullTopic); + brokerAccessControl.setNoPermitPullTopic(noPermitPullTopic); AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); accessContralAnalysis.analysisClass(RequestCode.class); - Map map = accessContralAnalysis.analysis(BrokerAccessControl); + Map map = accessContralAnalysis.analysis(brokerAccessControl); - authenticationInfo = new AuthenticationInfo(map, BrokerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY); + authenticationInfo = new AuthenticationInfo(map, brokerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY); System.setProperty("rocketmq.home.dir", "src/test/resources"); plainAclPlugEngine = new PlainAclPlugEngine(); @@ -280,7 +294,7 @@ public class PlainAclPlugEngineTest { Assert.assertFalse(isReturn); Set permitSendTopic = new HashSet<>(); - BrokerAccessControl.setPermitSendTopic(permitSendTopic); + brokerAccessControl.setPermitSendTopic(permitSendTopic); isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult); Assert.assertTrue(isReturn); @@ -288,11 +302,111 @@ public class PlainAclPlugEngineTest { isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult); Assert.assertFalse(isReturn); - BrokerAccessControl.setPermitPullTopic(permitSendTopic); + brokerAccessControl.setPermitPullTopic(permitSendTopic); isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult); Assert.assertTrue(isReturn); } + @Test + public void adminBrokerAccessControlTest() { + BrokerAccessControl admin = new BrokerAccessControl(); + admin.setAccount("adminTest"); + admin.setPassword("adminTest"); + admin.setNetaddress("127.0.0.1"); + plainAclPlugEngine.setAccessControl(admin); + Assert.assertFalse(admin.isUpdateAndCreateTopic()); + + admin.setAdmin(true); + plainAclPlugEngine.setAccessControl(admin); + Assert.assertTrue(admin.isUpdateAndCreateTopic()); + } + + @Test + public void adminEachCheckAuthentication() { + BrokerAccessControl accessControl = new BrokerAccessControl(); + accessControl.setAccount("RocketMQ1"); + accessControl.setPassword("1234567"); + accessControl.setNetaddress("127.0.0.1"); + plainAclPlugEngine.setAccessControl(accessControl); + for (Integer code : adminCode) { + accessControl.setCode(code); + AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl); + Assert.assertFalse(authenticationResult.isSucceed()); + + } + plainAclPlugEngine.cleanAuthenticationInfo(); + accessControl.setAdmin(true); + plainAclPlugEngine.setAccessControl(accessControl); + for (Integer code : adminCode) { + accessControl.setCode(code); + AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl); + Assert.assertTrue(authenticationResult.isSucceed()); + } + } + + @Test + public void cleanAuthenticationInfoTest() { + plainAclPlugEngine.setAccessControl(accessControl); + accessControl.setCode(202); + AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl); + Assert.assertTrue(authenticationResult.isSucceed()); + plainAclPlugEngine.cleanAuthenticationInfo(); + authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl); + Assert.assertFalse(authenticationResult.isSucceed()); + } + + @Test + public void isWatchStartTest() { + PlainAclPlugEngine plainAclPlugEngine = new PlainAclPlugEngine(); + Assert.assertTrue(plainAclPlugEngine.isWatchStart()); + System.setProperty("java.version", "1.6.11"); + plainAclPlugEngine = new PlainAclPlugEngine(); + Assert.assertFalse(plainAclPlugEngine.isWatchStart()); + } + + @Test + public void watchTest() throws IOException { + System.setProperty("rocketmq.home.dir", "src/test/resources/watch"); + File file = new File("src/test/resources/watch/conf"); + file.mkdirs(); + File transport = new File("src/test/resources/watch/conf/transport.yml"); + transport.createNewFile(); + + FileWriter writer = new FileWriter(transport); + writer.write("list:\r\n"); + writer.write("- account: rokcetmq\r\n"); + writer.write(" password: aliyun11\r\n"); + writer.write(" netaddress: 127.0.0.1\r\n"); + writer.flush(); + writer.close(); + PlainAclPlugEngine plainAclPlugEngine = new PlainAclPlugEngine(); + accessControl.setCode(203); + AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl); + Assert.assertTrue(authenticationResult.isSucceed()); + + writer = new FileWriter(new File("src/test/resources/watch/conf/transport.yml"), true); + writer.write("- account: rokcet1\r\n"); + writer.write(" password: aliyun1\r\n"); + writer.write(" netaddress: 127.0.0.1\r\n"); + writer.flush(); + writer.close(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + accessControlTwo.setCode(203); + authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControlTwo); + Assert.assertTrue(authenticationResult.isSucceed()); + + transport.delete(); + file.delete(); + file = new File("src/test/resources/watch"); + file.delete(); + + } + @Test public void analysisTest() { BrokerAccessControl accessControl = new BrokerAccessControl(); @@ -304,6 +418,9 @@ public class PlainAclPlugEngineTest { while (it.hasNext()) { Entry e = it.next(); if (!e.getValue()) { + if (adminCode.contains(e.getKey())) { + continue; + } Assert.assertEquals(e.getKey(), Integer.valueOf(10)); num++; } diff --git a/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator deleted file mode 100644 index 422b1e7bcbcc93fcbe838b881383ddbecd0ad934..0000000000000000000000000000000000000000 --- a/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator +++ /dev/null @@ -1 +0,0 @@ -org.apache.rocketmq.acl.PlainAccessValidator \ No newline at end of file