diff --git a/acl-plug/pom.xml b/acl-plug/pom.xml index 3a86a6ab63b81df9a9f885950550bdb4ad056359..a4633f70a131f0adac4f50ac51d90e8be09c9322 100644 --- a/acl-plug/pom.xml +++ b/acl-plug/pom.xml @@ -1,8 +1,8 @@ + xmlns="http://maven.apache.org/POM/4.0.0"> 4.0.0 org.apache.rocketmq diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java index 35cd6340c3d0123fc5fe0ad09c181c0d6513e6d9..64a9a2ad66294a42e435d7ba26e642c03e4f4c2b 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java @@ -22,13 +22,32 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.plug.annotation.RequestCode; import org.apache.rocketmq.acl.plug.entity.AccessControl; +import org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException; public class AccessContralAnalysis { private Map, Map> classTocodeAndMentod = new HashMap<>(); + private Map fieldNameAndCode = new HashMap<>(); + + public void analysisClass(Class clazz) { + Field[] fields = clazz.getDeclaredFields(); + try { + for(Field field : fields) { + if( field.getType().equals(int.class)) { + String name = StringUtils.replace(field.getName(), "_", "").toLowerCase(); + fieldNameAndCode.put(name, (Integer)field.get(null)); + } + + } + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new AclPlugAccountAnalysisException(String.format("analysis on failure Class is %s", clazz.getName()), e); + } + } + public Map analysis(AccessControl accessControl) { Class clazz = accessControl.getClass(); Map codeAndField = classTocodeAndMentod.get(clazz); @@ -36,18 +55,19 @@ public class AccessContralAnalysis { codeAndField = new HashMap<>(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { - RequestCode requestCode = field.getAnnotation(RequestCode.class); - if (requestCode != null) { - int code = requestCode.code(); - if (codeAndField.containsKey(code)) { - - } else { - field.setAccessible(true); - codeAndField.put(code, field); - } - } + if(!field.getType().equals(boolean.class)) + continue; + Integer code = fieldNameAndCode.get(field.getName().toLowerCase()); + if(code == null) { + throw new AclPlugAccountAnalysisException(String.format("field nonexistent in code", field.getName())); + } + field.setAccessible( true ); + codeAndField.put(code, field); } + if(codeAndField.isEmpty()) { + throw new AclPlugAccountAnalysisException(String.format("AccessControl nonexistent code , name %s" , accessControl.getClass().getName())); + } classTocodeAndMentod.put(clazz, codeAndField); } Iterator> it = codeAndField.entrySet().iterator(); @@ -57,8 +77,8 @@ public class AccessContralAnalysis { Entry e = it.next(); authority.put(e.getKey(), (Boolean) e.getValue().get(accessControl)); } - } catch (IllegalArgumentException | IllegalAccessException e1) { - e1.printStackTrace(); + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new AclPlugAccountAnalysisException(String.format("analysis on failure AccessControl is %s", AccessControl.class.getName()), e); } return authority; } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java index fc0a73b9d004323de8ba3c92e008490de8d8d816..d3781059dd1ca56538a4009589afeef531dbe07f 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java @@ -47,7 +47,9 @@ public class AclPlugController { } public void doChannelCloseEvent(String remoteAddr) { - aclPlugEngine.deleteLoginInfo(remoteAddr); + if (this.startSucceed) { + aclPlugEngine.deleteLoginInfo(remoteAddr); + } } public boolean isStartSucceed() { diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingServer.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingServer.java index 63f0b20bd2e53372decbf36d662cc4060adb7a4d..4eeb2a54c4f7bc8f97bebe6f09b3557713adb36c 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingServer.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingServer.java @@ -16,14 +16,11 @@ */ package org.apache.rocketmq.acl.plug; -import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl; public interface AclRemotingServer { - public AuthenticationInfo login(); - public AuthenticationResult eachCheck(LoginOrRequestAccessControl accessControl); } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java index 7a2651de624a3553594c2aad71bd0b4d7f2d5999..283466b5befc4d37026e0912d6a5cdb0e647425f 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java @@ -45,7 +45,7 @@ public class Authentication { authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName)); return false; } - return true; + return borker.getPermitSendTopic().isEmpty() ? true : false; } else if (code == 11) { if (borker.getPermitPullTopic().contains(topicName)) { return true; @@ -54,7 +54,7 @@ public class Authentication { authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName)); return false; } - return true; + return borker.getPermitPullTopic().isEmpty() ? true : false; } return true; } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java index 117266e592bcb0602be3fc500dd688130c1115c8..325ffab8047301f40eeca5e276607c3b0e5d1a24 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.acl.plug; import org.apache.rocketmq.acl.plug.engine.AclPlugEngine; -import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl; import org.apache.rocketmq.acl.plug.exception.AclPlugAuthenticationException; @@ -32,12 +31,6 @@ public class DefaultAclRemotingServerImpl implements AclRemotingServer { this.aclPlugEngine = aclPlugEngine; } - @Override - public AuthenticationInfo login() { - - return null; - } - @Override public AuthenticationResult eachCheck(LoginOrRequestAccessControl accessControl) { AuthenticationResult authenticationResult = aclPlugEngine.eachCheckLoginAndAuthentication(accessControl); diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java index 4c601abfd9f2fe6fd94e53707e58bc081d457876..73205416d3b41e2031e465b3d26d6e7ff0bac430 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java @@ -19,13 +19,13 @@ package org.apache.rocketmq.acl.plug.engine; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.rocketmq.acl.plug.AccessContralAnalysis; import org.apache.rocketmq.acl.plug.Authentication; import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport; +import org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity; import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl; import org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException; import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy; @@ -48,7 +48,18 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl private Authentication authentication = new Authentication(); + ControllerParametersEntity controllerParametersEntity; + + public AuthenticationInfoManagementAclPlugEngine(ControllerParametersEntity controllerParametersEntity) { + this.controllerParametersEntity = controllerParametersEntity; + accessContralAnalysis.analysisClass(controllerParametersEntity.getAccessContralAnalysisClass()); + } + + public void setAccessControl(AccessControl accessControl) throws AclPlugAccountAnalysisException { + if (accessControl.getAccount() == null || accessControl.getPassword() == null || accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) { + throw new AclPlugAccountAnalysisException(String.format("The account password cannot be null and is longer than 6, account is %s password is %s", accessControl.getAccount(), accessControl.getPassword())); + } try { NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl); Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java index 304c18f36350855f1f458e4caf0814a0d54eb96e..76412351d230153bb4f933a98a7a3e16f21f397f 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; +import org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity; import org.apache.rocketmq.acl.plug.entity.LoginInfo; import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl; @@ -29,6 +30,11 @@ public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagemen private Map loginInfoMap = new ConcurrentHashMap<>(); + + public LoginInfoAclPlugEngine(ControllerParametersEntity controllerParametersEntity) { + super(controllerParametersEntity); + } + public LoginInfo getLoginInfo(AccessControl accessControl) { LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition()); if (loginInfo == null) { @@ -51,9 +57,9 @@ public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagemen protected AuthenticationInfo getAuthenticationInfo(LoginOrRequestAccessControl accessControl, AuthenticationResult authenticationResult) { - LoginInfo anthenticationInfo = getLoginInfo(accessControl); - if (anthenticationInfo != null && anthenticationInfo.getAuthenticationInfo() != null) { - return anthenticationInfo.getAuthenticationInfo(); + LoginInfo loginInfo = getLoginInfo(accessControl); + if (loginInfo != null && loginInfo.getAuthenticationInfo() != null) { + return loginInfo.getAuthenticationInfo(); } authenticationResult.setResultString("Login information does not exist, Please check login, password, IP"); return null; diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java index dd7acbf79977b29fd18d48f16cd42e67547193b5..7e4ede4a47132200d5770b94b4c29339b63568e6 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.acl.plug.engine; import java.io.File; import java.io.FileInputStream; import java.io.IOException; - import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport; import org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity; import org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException; @@ -29,9 +28,8 @@ public class PlainAclPlugEngine extends LoginInfoAclPlugEngine { private ControllerParametersEntity controllerParametersEntity; - public PlainAclPlugEngine( - ControllerParametersEntity controllerParametersEntity) throws AclPlugAccountAnalysisException { - this.controllerParametersEntity = controllerParametersEntity; + public PlainAclPlugEngine(ControllerParametersEntity controllerParametersEntity) throws AclPlugAccountAnalysisException { + super(controllerParametersEntity); init(); } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java index c4b9f7071ecd40baa92f55d5c13c601bef9e763b..981bef855332e6fe8cbf7451636e901ce742f337 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.acl.plug.entity; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; - import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy; public class AuthenticationInfo { diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java index 9de76fba9f9bbff6d1974617fca057f07c957197..0446ca002291a45cf08477fc930e465c00be9e70 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java @@ -18,170 +18,117 @@ package org.apache.rocketmq.acl.plug.entity; import java.util.HashSet; import java.util.Set; - import org.apache.rocketmq.acl.plug.annotation.RequestCode; public class BorkerAccessControl extends AccessControl { - public BorkerAccessControl() { - - } - private Set permitSendTopic = new HashSet<>(); - private Set noPermitSendTopic = new HashSet<>(); - private Set permitPullTopic = new HashSet<>(); - private Set noPermitPullTopic = new HashSet<>(); - @RequestCode(code = 10) private boolean sendMessage = true; - - @RequestCode(code = 310) + private boolean sendMessageV2 = true; - @RequestCode(code = 320) private boolean sendBatchMessage = true; - @RequestCode(code = 36) private boolean consumerSendMsgBack = true; - @RequestCode(code = 11) private boolean pullMessage = true; - @RequestCode(code = 12) private boolean queryMessage = true; - @RequestCode(code = 33) private boolean viewMessageById = true; - @RequestCode(code = 34) private boolean heartBeat = true; - @RequestCode(code = 35) private boolean unregisterClient = true; - @RequestCode(code = 46) private boolean checkClientConfig = true; - @RequestCode(code = 38) private boolean getConsumerListByGroup = true; - @RequestCode(code = 15) private boolean updateConsumerOffset = true; - @RequestCode(code = 14) private boolean queryConsumerOffset = true; - @RequestCode(code = 37) private boolean endTransaction = true; - @RequestCode(code = 17) private boolean updateAndCreateTopic = true; - @RequestCode(code = 215) private boolean deleteTopicInbroker = true; - @RequestCode(code = 21) private boolean getAllTopicConfig = true; - @RequestCode(code = 25) private boolean updateBrokerConfig = true; - @RequestCode(code = 26) private boolean getBrokerConfig = true; - @RequestCode(code = 29) private boolean searchOffsetByTimestamp = true; - @RequestCode(code = 30) private boolean getMaxOffset = true; - @RequestCode(code = 31) private boolean getMixOffset = true; - @RequestCode(code = 32) private boolean getEarliestMsgStoretime = true; - @RequestCode(code = 28) private boolean getBrokerRuntimeInfo = true; - @RequestCode(code = 41) private boolean lockBatchMQ = true; - @RequestCode(code = 42) private boolean unlockBatchMQ = true; - @RequestCode(code = 200) private boolean updateAndCreteSubscriptiongroup = true; - @RequestCode(code = 201) private boolean getAllSubscriptiongroupConfig = true; - @RequestCode(code = 207) private boolean deleteSubscriptiongroup = true; - @RequestCode(code = 202) private boolean getTopicStatsInfo = true; - @RequestCode(code = 203) private boolean getConsumerConnectionList = true; - @RequestCode(code = 204) private boolean getProducerConnectionList = true; - @RequestCode(code = 208) private boolean getConsumeStats = true; - @RequestCode(code = 43) private boolean getAllConsumerOffset = true; - @RequestCode(code = 25) private boolean getAllDelayOffset = true; - @RequestCode(code = 222) private boolean invokeBrokerToresetOffset = true; - @RequestCode(code = 300) private boolean queryTopicConsumByWho = true; - @RequestCode(code = 301) private boolean registerFilterServer = true; - - @RequestCode(code = 303) + private boolean queryConsumeTimeSpan = true; - @RequestCode(code = 305) private boolean getSystemTopicListFromBroker = true; - @RequestCode(code = 306) private boolean cleanExpiredConsumequeue = true; - @RequestCode(code = 316) private boolean cleanUnusedTopic = true; - @RequestCode(code = 307) private boolean getConsumerRunningInfo = true; - @RequestCode(code = 308) private boolean queryCorrectionOffset = true; - @RequestCode(code = 309) private boolean consumeMessageDirectly = true; - @RequestCode(code = 314) private boolean cloneGroupOffset = true; - @RequestCode(code = 315) private boolean viewBrokerStatsData = true; - @RequestCode(code = 317) private boolean getBrokerConsumeStats = true; - @RequestCode(code = 321) private boolean queryConsumeQueue = true; + public BorkerAccessControl() { + + } + public Set getPermitSendTopic() { return permitSendTopic; } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParametersEntity.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParametersEntity.java index 2d515477ad03be7db3055e8d032780d79813958c..9187db88f40cc35657d3a3b339e514710d5f5a5a 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParametersEntity.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParametersEntity.java @@ -16,10 +16,14 @@ */ package org.apache.rocketmq.acl.plug.entity; +import org.apache.rocketmq.common.protocol.RequestCode; + public class ControllerParametersEntity { private String fileHome; + private Class accessContralAnalysisClass = RequestCode.class; + public String getFileHome() { return fileHome; } @@ -28,11 +32,21 @@ public class ControllerParametersEntity { this.fileHome = fileHome; } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("ControllerParametersEntity [fileHome=").append(fileHome).append("]"); - return builder.toString(); - } + + public Class getAccessContralAnalysisClass() { + return accessContralAnalysisClass; + } + + public void setAccessContralAnalysisClass(Class accessContralAnalysisClass) { + this.accessContralAnalysisClass = accessContralAnalysisClass; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ControllerParametersEntity [fileHome=").append(fileHome).append(", accessContralAnalysisClass=") + .append(accessContralAnalysisClass).append("]"); + return builder.toString(); + } } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java index 557cabc7de4b44d6262fd8910dba0e202a339ba4..fd49cc862eb602d4d51843feb53708373bcacae7 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.acl.plug.strategy; import java.util.HashSet; import java.util.Set; - import org.apache.rocketmq.acl.plug.entity.AccessControl; public class MultipleNetaddressStrategy extends AbstractNetaddressStrategy { diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java index 06a5472536d4341026a71e9420aac52e0917e51e..2e44077e5c17afb939f3a7be0c1db6ba857dfac2 100644 --- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java @@ -1,18 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.plug; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl; +import org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException; +import org.apache.rocketmq.common.protocol.RequestCode; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class AccessContralAnalysisTest { + AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); + + @Before + public void init() { + accessContralAnalysis.analysisClass(RequestCode.class); + } + @Test public void analysisTest() { - AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); BorkerAccessControl accessControl = new BorkerAccessControl(); accessControl.setSendMessage(false); Map map = accessContralAnalysis.analysis(accessControl); @@ -27,7 +53,13 @@ public class AccessContralAnalysisTest { } } Assert.assertEquals(num, 1); - + } + + + @Test(expected=AclPlugAccountAnalysisException.class) + public void analysisExceptionTest(){ + AccessControl accessControl = new AccessControl(); + accessContralAnalysis.analysis(accessControl); } } diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java index da50ac14005b192f3d4dc03a9b904ee3cfb82633..223cbc7c25d6ff415a3458bd3b77db33b751d5ae 100644 --- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.plug; public class AclPlugControllerTest { diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java index d65752fbee090003a85098d8175bba6a7eb5d77f..806d18089497b135026ef237df31982994fc8e53 100644 --- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java @@ -1,9 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.plug; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; - import org.apache.commons.lang3.StringUtils; import org.junit.Assert; import org.junit.Test; diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java index c425315b30adc07e8c78744d35f37a5f4b0502eb..fb1f6472674a30bbcd9173abc48a70cc5c843add 100644 --- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java @@ -1,9 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.plug; import java.util.HashSet; import java.util.Map; import java.util.Set; - import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl; @@ -18,11 +33,16 @@ public class AuthenticationTest { Authentication authentication = new Authentication(); AuthenticationInfo authenticationInfo; + + BorkerAccessControl borkerAccessControl; + + AuthenticationResult authenticationResult = new AuthenticationResult(); + LoginOrRequestAccessControl loginOrRequestAccessControl = new LoginOrRequestAccessControl(); @Before public void init() { OneNetaddressStrategy netaddressStrategy = new OneNetaddressStrategy("127.0.0.1"); - BorkerAccessControl borkerAccessControl = new BorkerAccessControl(); + borkerAccessControl = new BorkerAccessControl(); //321 borkerAccessControl.setQueryConsumeQueue(false); @@ -51,8 +71,7 @@ public class AuthenticationTest { @Test public void authenticationTest() { - AuthenticationResult authenticationResult = new AuthenticationResult(); - LoginOrRequestAccessControl loginOrRequestAccessControl = new LoginOrRequestAccessControl(); + loginOrRequestAccessControl.setCode(317); boolean isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); @@ -81,7 +100,7 @@ public class AuthenticationTest { loginOrRequestAccessControl.setTopic("nopermitSendTopic"); isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); - Assert.assertTrue(isReturn); + Assert.assertFalse(isReturn); loginOrRequestAccessControl.setCode(11); loginOrRequestAccessControl.setTopic("permitPullTopic"); @@ -94,7 +113,29 @@ public class AuthenticationTest { loginOrRequestAccessControl.setTopic("nopermitPullTopic"); isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); - Assert.assertTrue(isReturn); + Assert.assertFalse(isReturn); } + + @Test + public void isEmptyTest() { + loginOrRequestAccessControl.setCode(10); + loginOrRequestAccessControl.setTopic("absentTopic"); + boolean isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); + Assert.assertFalse(isReturn); + + Set permitSendTopic = new HashSet<>(); + borkerAccessControl.setPermitSendTopic(permitSendTopic); + isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); + Assert.assertTrue(isReturn); + + loginOrRequestAccessControl.setCode(11); + isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); + Assert.assertFalse(isReturn); + + borkerAccessControl.setPermitPullTopic(permitSendTopic); + isReturn = authentication.authentication(authenticationInfo, loginOrRequestAccessControl, authenticationResult); + Assert.assertTrue(isReturn); + } + } diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java index d126f19f666d2f4eb2401da4efc5978877f2c8da..4098466016747b6deb8e04a6f75f45b101ae95cf 100644 --- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java @@ -1,14 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.plug.engine; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; @@ -54,13 +68,13 @@ public class PlainAclPlugEngineTest { accessControl = new BorkerAccessControl(); accessControl.setAccount("rokcetmq"); - accessControl.setPassword("aliyun"); + accessControl.setPassword("aliyun11"); accessControl.setNetaddress("127.0.0.1"); accessControl.setRecognition("127.0.0.1:1"); accessControlTwo = new BorkerAccessControl(); - accessControlTwo.setAccount("rokcet"); - accessControlTwo.setPassword("aliyun"); + accessControlTwo.setAccount("rokcet1"); + accessControlTwo.setPassword("aliyun1"); accessControlTwo.setNetaddress("127.0.0.1"); accessControlTwo.setRecognition("127.0.0.1:2"); @@ -69,6 +83,31 @@ public class PlainAclPlugEngineTest { } + + @Test(expected = AclPlugAccountAnalysisException.class) + public void accountNullTest() { + accessControl.setAccount(null); + plainAclPlugEngine.setAccessControl(accessControl); + } + + @Test(expected = AclPlugAccountAnalysisException.class) + public void accountThanTest() { + accessControl.setAccount("123"); + plainAclPlugEngine.setAccessControl(accessControl); + } + + @Test(expected = AclPlugAccountAnalysisException.class) + public void passWordtNullTest() { + accessControl.setAccount(null); + plainAclPlugEngine.setAccessControl(accessControl); + } + + @Test(expected = AclPlugAccountAnalysisException.class) + public void passWordThanTest() { + accessControl.setAccount("123"); + plainAclPlugEngine.setAccessControl(accessControl); + } + @Test(expected = AclPlugAccountAnalysisException.class) public void testPlainAclPlugEngineInit() { ControllerParametersEntity controllerParametersEntity = new ControllerParametersEntity(); @@ -88,7 +127,7 @@ public class PlainAclPlugEngineTest { AccessControl testAccessControl = new AccessControl(); testAccessControl.setAccount("rokcetmq"); - testAccessControl.setPassword("aliyun"); + testAccessControl.setPassword("aliyun11"); testAccessControl.setNetaddress("127.0.0.1"); testAccessControl.setRecognition("127.0.0.1:1"); @@ -97,7 +136,7 @@ public class PlainAclPlugEngineTest { Assert.assertNull(authenticationInfo); testAccessControl.setAccount("rokcetmq"); - testAccessControl.setPassword("1"); + testAccessControl.setPassword("1234567"); authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl); Assert.assertNull(authenticationInfo); @@ -128,6 +167,8 @@ public class PlainAclPlugEngineTest { public void setNetaddressAccessControl() { AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine; AccessControl accessControl = new BorkerAccessControl(); + accessControl.setAccount("RocketMQ"); + accessControl.setPassword("RocketMQ"); accessControl.setNetaddress("127.0.0.1"); aclPlugEngine.setAccessControl(accessControl); aclPlugEngine.setNetaddressAccessControl(accessControl); @@ -162,6 +203,8 @@ public class PlainAclPlugEngineTest { AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine; AccessControl accessControl = new BorkerAccessControl(); + accessControl.setAccount("RocketMQ"); + accessControl.setPassword("RocketMQ"); accessControl.setNetaddress("127.0.0.1"); aclPlugEngine.setAccessControl(accessControl); AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl); @@ -201,7 +244,7 @@ public class PlainAclPlugEngineTest { public void getAuthenticationInfo() { LoginOrRequestAccessControl loginOrRequestAccessControl = new LoginOrRequestAccessControl(); loginOrRequestAccessControl.setAccount("rokcetmq"); - loginOrRequestAccessControl.setPassword("aliyun"); + loginOrRequestAccessControl.setPassword("aliyun11"); loginOrRequestAccessControl.setNetaddress("127.0.0.1"); loginOrRequestAccessControl.setRecognition("127.0.0.1:1"); diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java index bf9101b200f637aad4fe6b892a982866aecef5dd..f670b31ec48f7180f719d3018aa59fec11c3871a 100644 --- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.plug.strategy; import org.apache.rocketmq.acl.plug.entity.AccessControl; diff --git a/acl-plug/src/test/resources/conf/transport.yml b/acl-plug/src/test/resources/conf/transport.yml index 897aedd3fd9643478e8f8ad093239da8d2a8f114..4f29f4ee49a246ccf0e66b4e370e9df91222ec40 100644 --- a/acl-plug/src/test/resources/conf/transport.yml +++ b/acl-plug/src/test/resources/conf/transport.yml @@ -4,14 +4,14 @@ onlyNetAddress: - broker-a list: -- account: laohu - password: 123456 +- account: rocketMQ + password: 1234567 netaddress: 192.0.0.* permitSendTopic: - test1 - test2 -- account: laohu - password: 123456 +- account: rocketMQ + password: 1234567 netaddress: 192.0.2.1 permitSendTopic: - test3 diff --git a/broker/pom.xml b/broker/pom.xml index 7c67de57c1fe185bbc6f9a5e3c24575caf160760..c353eb32b830cbc7be701d52537462f370ea8a33 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -9,81 +9,81 @@ OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> - - - org.apache.rocketmq - rocketmq-all - 4.4.0-SNAPSHOT - + + + org.apache.rocketmq + rocketmq-all + 4.4.0-SNAPSHOT + - 4.0.0 - jar - rocketmq-broker - rocketmq-broker ${project.version} + 4.0.0 + jar + rocketmq-broker + rocketmq-broker ${project.version} - - - ${project.groupId} - rocketmq-common - - - ${project.groupId} - rocketmq-store - - - ${project.groupId} - rocketmq-remoting - - - ${project.groupId} - rocketmq-client - - - ${project.groupId} - rocketmq-srvutil - - - ${project.groupId} - rocketmq-filter - - - ${project.groupId} - rocketmq-acl-plug - - - ch.qos.logback - logback-classic - - - ch.qos.logback - logback-core - - - com.alibaba - fastjson - - - org.javassist - javassist - - - org.slf4j - slf4j-api - - + + + ${project.groupId} + rocketmq-common + + + ${project.groupId} + rocketmq-store + + + ${project.groupId} + rocketmq-remoting + + + ${project.groupId} + rocketmq-client + + + ${project.groupId} + rocketmq-srvutil + + + ${project.groupId} + rocketmq-filter + + + ${project.groupId} + rocketmq-acl-plug + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + com.alibaba + fastjson + + + org.javassist + javassist + + + org.slf4j + slf4j-api + + - - - - maven-surefire-plugin - 2.19.1 - - 1 - false - - - - + + + + maven-surefire-plugin + 2.19.1 + + 1 + false + + + + diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index e6c83ed577497ff648d219b09a7d9c0edf9f0055..4f3b736f0aed01ad3316d20835f888042bef1e2a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -302,7 +302,6 @@ public class BrokerController { this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true)); - this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getEndTransactionThreadPoolNums(), this.brokerConfig.getEndTransactionThreadPoolNums(), @@ -1101,12 +1100,12 @@ public class BrokerController { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } -<<<<<<< HEAD public AclPlugController getAclPlugController() { return this.aclPlugController; -======= + } + public BlockingQueue getEndTransactionThreadPoolQueue() { return endTransactionThreadPoolQueue; ->>>>>>> 53a63460d3a1599a6c51058bb51a73746233022d + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index 04794d1dc1ba1f825a934d71732ab0568e4861e3..f4ecc2c046beeb35e8e78435e275efe716c27af6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -72,7 +72,9 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr); + if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) { + this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr); + } } @Override diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9920cc49da2ba91f7212d3f904e35c1d9bd188ff..6e11de20ff25b1fe7bb2e1baf3beda46550ac6ea 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -70,7 +70,6 @@ public class BrokerConfig { */ private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2; - private int flushConsumerOffsetInterval = 1000 * 5; private int flushConsumerOffsetHistoryInterval = 1000 * 60; @@ -174,6 +173,16 @@ public class BrokerConfig { private boolean isAclPlug; + public static String localHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + log.error("Failed to obtain the host name", e); + } + + return "DEFAULT_BROKER"; + } + public boolean isTraceOn() { return traceOn; } @@ -238,16 +247,6 @@ public class BrokerConfig { this.slaveReadEnable = slaveReadEnable; } - public static String localHostName() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - log.error("Failed to obtain the host name", e); - } - - return "DEFAULT_BROKER"; - } - public int getRegisterBrokerTimeoutMills() { return registerBrokerTimeoutMills; } @@ -712,7 +711,6 @@ public class BrokerConfig { this.transactionCheckInterval = transactionCheckInterval; } - public boolean isAclPlug() { return isAclPlug; } diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml index 424fd8d8e7bdb6aed468ddbc0c930ff0d3f9d96a..d9552ac73e21d443272e2f2d8e0d2b1d5cecdf8a 100644 --- a/distribution/conf/transport.yml +++ b/distribution/conf/transport.yml @@ -4,14 +4,14 @@ onlyNetAddress: - broker-a list: - - account: laohu - password: 123456 + - account: RocketMQ + password: 1234567 netaddress: 192.0.0.* permitSendTopic: - test1 - test2 - - account: laohu - password: 123456 + - account: RocketMQ + password: 1234567 netaddress: 192.0.2.1 permitSendTopic: - test3