diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java index d573e56cf1fcfd59d68599a61e9523019fd0c98c..46f5728b43f52f4aeeee552cc921f73114a1735f 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java @@ -25,7 +25,7 @@ public interface AccessValidator { * @param request * @return */ - AccessResource parse(RemotingCommand request); + AccessResource parse(RemotingCommand request,String remoteAddr); /** * Validate the access resource. diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java index 859cc809243b71d9fc4e5fc6cafafa387929f918..215a756e01bd45f7966f5dd114fe0add9f38ae90 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class DefaultAccessValidator implements AccessValidator { - @Override public AccessResource parse(RemotingCommand request) { + @Override public AccessResource parse(RemotingCommand request,String remoteAddr ) { return null; } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java index b42205abcf078011adf6eb7a0eb20aa4f644d038..7bb13a181062c06ab51590aab0c09d9b77b70829 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java @@ -16,15 +16,29 @@ */ package org.apache.rocketmq.acl.plug; +import java.util.HashMap; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.AccessResource; +import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plug.engine.AclPlugEngine; +import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine; import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; +import org.apache.rocketmq.acl.plug.entity.ControllerParameters; import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class DefaultAclRemotingServiceImpl implements AclRemotingService { +public class DefaultAclRemotingServiceImpl implements AclRemotingService ,AccessValidator{ private AclPlugEngine aclPlugEngine; + public DefaultAclRemotingServiceImpl() { + ControllerParameters controllerParameters = new ControllerParameters(); + this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters); + this.aclPlugEngine.initialize(); + } + public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) { this.aclPlugEngine = aclPlugEngine; } @@ -41,4 +55,30 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService { return authenticationResult; } + @Override + public AccessResource parse(RemotingCommand request ,String remoteAddr) { + HashMap extFields = request.getExtFields(); + AccessControl accessControl = new AccessControl(); + accessControl.setCode(request.getCode()); + accessControl.setRecognition(remoteAddr); + if (extFields != null) { + accessControl.setAccount(extFields.get("account")); + accessControl.setPassword(extFields.get("password")); + accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]); + accessControl.setTopic(extFields.get("topic")); +} + return accessControl; + } + + @Override + public void validate(AccessResource accessResource) { + AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl)accessResource); + if (authenticationResult.getException() != null) { + throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); + } + if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { + throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); + } + } + } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java index badae946c1ee59b70db9fab9c4edf69be4084855..e4ef9872741a9cbc779ce90fac92a8b981d66166 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java @@ -30,6 +30,8 @@ public interface AclPlugEngine { public void deleteLoginInfo(String remoteAddr); public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl); + + public AuthenticationResult eachCheckAuthentication(AccessControl accessControl); public void initialize(); } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java index 6aac6bd438e0406abacc92c7034bee53d3430af8..12f7d8b29a34dda1fd9dbd2dde8053138094539b 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.acl.plug.engine; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,7 +38,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME); ControllerParameters controllerParameters; - private Map> accessControlMap = new HashMap<>(); + private Map> accessControlMap = new HashMap<>(); private AuthenticationInfo authenticationInfo; private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory(); private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); @@ -54,13 +55,13 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl } try { NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl); - Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); - if (accessControlAddressMap == null) { - accessControlAddressMap = new HashMap<>(); - accessControlMap.put(accessControl.getAccount(), accessControlAddressMap); + List accessControlAddressList = accessControlMap.get(accessControl.getAccount()); + if (accessControlAddressList == null) { + accessControlAddressList = new ArrayList<>(); + accessControlMap.put(accessControl.getAccount(), accessControlAddressList); } AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy); - accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo); + accessControlAddressList.add( authenticationInfo); log.info("authenticationInfo is {}", authenticationInfo.toString()); } catch (Exception e) { throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e); @@ -84,24 +85,19 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl } public AuthenticationInfo getAccessControl(AccessControl accessControl) { - AuthenticationInfo existing = null; if (accessControl.getAccount() == null && authenticationInfo != null) { - existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null; + return authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null; } else { - Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); - if (accessControlAddressMap != null) { - existing = accessControlAddressMap.get(accessControl.getNetaddress()); - if (existing == null) - return null; - if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) { - if (existing.getNetaddressStrategy().match(accessControl)) { - return existing; - } - } - existing = null; + List accessControlAddressList = accessControlMap.get(accessControl.getAccount()); + if (accessControlAddressList != null) { + for(AuthenticationInfo ai : accessControlAddressList) { + if(ai.getNetaddressStrategy().match(accessControl)&&ai.getAccessControl().getPassword().equals(accessControl.getPassword())) { + return ai; + } + } } } - return existing; + return null; } @Override @@ -112,12 +108,27 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl if (authenticationInfo != null) { boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult); authenticationResult.setSucceed(boo); + authenticationResult.setAccessControl(authenticationInfo.getAccessControl()); } } catch (Exception e) { authenticationResult.setException(e); } return authenticationResult; } + + public AuthenticationResult eachCheckAuthentication(AccessControl accessControl) { + AuthenticationResult authenticationResult = new AuthenticationResult(); + AuthenticationInfo authenticationInfo = getAccessControl(accessControl); + if(authenticationInfo != null) { + boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult); + authenticationResult.setSucceed(boo); + }else { + authenticationResult.setResultString("accessControl is null, Please check login, password, IP\""); + } + + + return authenticationResult; + } void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) { if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) { diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java index cf3a736a7f80f59c42145fd217491f632fbb00ce..b46a034b51e46ad2726bfbde16190ce86912bebe 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.acl.plug.entity; -public class AccessControl { +import org.apache.rocketmq.acl.AccessResource; + +public class AccessControl implements AccessResource{ private String account; diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java index 708bcbeb9153c194a2c95dca8d1aec16913a135f..74ae4a7ecb7ed13e51071d3bd5ac05fb5eb95d50 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java @@ -16,11 +16,12 @@ */ package org.apache.rocketmq.acl.plug.entity; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.RequestCode; public class ControllerParameters { - private String fileHome; + private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private Class accessContralAnalysisClass = RequestCode.class; diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c0d8cdb15378cda15bd78ec9d227a6ba2c62be07 --- /dev/null +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java @@ -0,0 +1,132 @@ +package org.apache.rocketmq.acl.plug; + +import java.util.HashMap; + +import org.apache.rocketmq.acl.AccessResource; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.acl.plug.entity.AccessControl; +import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; +import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl; +import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test;; + +public class AclRemotingServiceTest { + + + AclRemotingService defaultAclService; + + AccessValidator accessValidator; + + AccessControl accessControl; + + AccessControl accessControlTwo; + + @Before + public void init() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + DefaultAclRemotingServiceImpl aclRemotingServiceImpl = new DefaultAclRemotingServiceImpl(); + defaultAclService = aclRemotingServiceImpl; + accessValidator = aclRemotingServiceImpl; + + accessControl = new BorkerAccessControl(); + accessControl.setAccount("RocketMQ"); + accessControl.setPassword("1234567"); + accessControl.setNetaddress("192.0.0.1"); + accessControl.setRecognition("127.0.0.1:1"); + + accessControlTwo = new BorkerAccessControl(); + accessControlTwo.setAccount("RocketMQ"); + accessControlTwo.setPassword("1234567"); + accessControlTwo.setNetaddress("192.0.2.1"); + accessControlTwo.setRecognition("127.0.0.1:2"); + } + + + + @Test + public void defaultConstructorTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + AclRemotingService defaultAclService = new DefaultAclRemotingServiceImpl(); + Assert.assertNotNull(defaultAclService); + } + + @Test + public void parseTest() { + RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(34, ""); + HashMap map = new HashMap<>(); + map.put("account", "RocketMQ"); + map.put("password","123456"); + map.put("topic","test"); + remotingCommand.setExtFields(map); + + AccessResource accessResource = accessValidator.parse(remotingCommand, "127.0.0.1:123"); + AccessControl accessControl = (AccessControl)accessResource; + AccessControl newAccessControl = new AccessControl(); + newAccessControl.setAccount("RocketMQ"); + newAccessControl.setPassword("123456"); + newAccessControl.setTopic("test"); + newAccessControl.setCode(34); + newAccessControl.setNetaddress("127.0.0.1"); + newAccessControl.setRecognition("127.0.0.1:123"); + Assert.assertEquals(accessControl.toString(), newAccessControl.toString()); + } + + @Test + public void checkTest() { + accessControl.setCode(34); + AuthenticationResult authenticationResult = defaultAclService.check(accessControl); + Assert.assertTrue(authenticationResult.isSucceed()); + } + + @Test(expected=AclPlugRuntimeException.class) + public void checkAccessExceptionTest() { + accessControl.setCode(34); + accessControl.setAccount("Rocketmq"); + defaultAclService.check(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void checkPasswordTest() { + accessControl.setCode(34); + accessControl.setPassword("123123123"); + defaultAclService.check(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void checkCodeTest() { + accessControl.setCode(14434); + accessControl.setPassword("123123123"); + defaultAclService.check(accessControl); + } + + + @Test + public void validateTest() { + accessControl.setCode(34); + accessValidator.validate(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void validateAccessExceptionTest() { + accessControl.setCode(34); + accessControl.setAccount("Rocketmq"); + accessValidator.validate(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void validatePasswordTest() { + accessControl.setCode(34); + accessControl.setPassword("123123123"); + accessValidator.validate(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void validateCodeTest() { + accessControl.setCode(14434); + accessControl.setPassword("123123123"); + accessValidator.validate(accessControl); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 7a4c105e79fcfbc87cb30a3c39bc0a587a02ecb8..d06949ca1f25b61d0e398b6d946ba0d2526f62e9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -160,7 +160,6 @@ public class BrokerController { private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; - private AclPlugController aclPlugController; public BrokerController( final BrokerConfig brokerConfig, @@ -510,7 +509,7 @@ public class BrokerController { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - validator.validate(validator.parse(request)); + validator.validate(validator.parse(request, remoteAddr)); } @Override @@ -1095,9 +1094,6 @@ public class BrokerController { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } - public AclPlugController getAclPlugController() { - return this.aclPlugController; - } public BlockingQueue getEndTransactionThreadPoolQueue() { return endTransactionThreadPoolQueue; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index f4ecc2c046beeb35e8e78435e275efe716c27af6..d536db5055eb2c2b7b6d8b5f4c80af743dd81a50 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -72,9 +72,6 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); - if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) { - this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr); - } } @Override diff --git a/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator new file mode 100644 index 0000000000000000000000000000000000000000..2f26220e5e106536a6b36e32cbf534e874e259fd --- /dev/null +++ b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator @@ -0,0 +1 @@ +org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java index 22228a6e0ef2cd9231b93a7cba8ca4f1a57cd2ee..a3a35c8832dd4222a6b3d338e46b9879c4e7b1e1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java @@ -17,12 +17,15 @@ package org.apache.rocketmq.broker.util; +import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; + public class ServiceProviderTest { @Test @@ -38,4 +41,10 @@ public class ServiceProviderTest { AbstractTransactionalMessageCheckListener.class); assertThat(listener).isNotNull(); } + + @Test + public void loadAccessValidatorTest() { + List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + assertThat(accessValidators).isNotNull(); + } } diff --git a/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator new file mode 100644 index 0000000000000000000000000000000000000000..2f26220e5e106536a6b36e32cbf534e874e259fd --- /dev/null +++ b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator @@ -0,0 +1 @@ +org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl \ No newline at end of file diff --git a/distribution/conf/broker.conf b/distribution/conf/broker.conf index 363bcbc03a646d0f3a3380b155ae15ec284c5e68..970395735d7d9a68b796c72dc92c9acfbf40ffae 100644 --- a/distribution/conf/broker.conf +++ b/distribution/conf/broker.conf @@ -20,5 +20,5 @@ deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH -aclPlug=true +enableAcl=true namesrvAddr=127.0.0.1:9876 diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml index 25d4902a67f9dc5d6c0d8e0b4d543f7350c23013..f86e68d411aa54fe42f62c9bd2734e334e81239a 100644 --- a/distribution/conf/transport.yml +++ b/distribution/conf/transport.yml @@ -14,7 +14,7 @@ # limitations under the License. onlyNetAddress: - netaddress: 10.10.103.* + netaddress: 127.0.0.* noPermitPullTopic: - broker-a @@ -31,4 +31,4 @@ list: permitSendTopic: - test3 - test4 - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 535893c21002260509dece08a43f157b98907970..38e518da28b09f781b522039b0a74a588d98707c 100644 --- a/pom.xml +++ b/pom.xml @@ -216,9 +216,9 @@ generate-effective-dependencies-pom generate-resources - + ${project.build.directory}/effective-pom/effective-dependencies.xml