From 74f4213b06c910946ff916e0781f0a181600fb83 Mon Sep 17 00:00:00 2001 From: laohu <8wy118611@163.com> Date: Thu, 1 Nov 2018 20:07:27 +0800 Subject: [PATCH] arrange --- .../apache/rocketmq/acl/AccessValidator.java | 2 +- .../rocketmq/acl/DefaultAccessValidator.java | 2 +- .../plug/DefaultAclRemotingServiceImpl.java | 42 +++++- .../acl/plug/engine/AclPlugEngine.java | 2 + ...enticationInfoManagementAclPlugEngine.java | 51 ++++--- .../acl/plug/entity/AccessControl.java | 4 +- .../acl/plug/entity/ControllerParameters.java | 3 +- .../acl/plug/AclRemotingServiceTest.java | 132 ++++++++++++++++++ .../rocketmq/broker/BrokerController.java | 6 +- .../client/ClientHousekeepingService.java | 3 - .../org.apache.rocketmq.acl.AccessValidator | 1 + .../broker/util/ServiceProviderTest.java | 9 ++ .../org.apache.rocketmq.acl.AccessValidator | 1 + distribution/conf/broker.conf | 2 +- distribution/conf/transport.yml | 4 +- pom.xml | 4 +- 16 files changed, 230 insertions(+), 38 deletions(-) create mode 100644 acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java create mode 100644 broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator create mode 100644 broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java index d573e56c..46f5728b 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java @@ -25,7 +25,7 @@ public interface AccessValidator { * @param request * @return */ - AccessResource parse(RemotingCommand request); + AccessResource parse(RemotingCommand request,String remoteAddr); /** * Validate the access resource. diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java index 859cc809..215a756e 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class DefaultAccessValidator implements AccessValidator { - @Override public AccessResource parse(RemotingCommand request) { + @Override public AccessResource parse(RemotingCommand request,String remoteAddr ) { return null; } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java index b42205ab..7bb13a18 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java @@ -16,15 +16,29 @@ */ package org.apache.rocketmq.acl.plug; +import java.util.HashMap; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.AccessResource; +import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plug.engine.AclPlugEngine; +import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine; import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; +import org.apache.rocketmq.acl.plug.entity.ControllerParameters; import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class DefaultAclRemotingServiceImpl implements AclRemotingService { +public class DefaultAclRemotingServiceImpl implements AclRemotingService ,AccessValidator{ private AclPlugEngine aclPlugEngine; + public DefaultAclRemotingServiceImpl() { + ControllerParameters controllerParameters = new ControllerParameters(); + this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters); + this.aclPlugEngine.initialize(); + } + public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) { this.aclPlugEngine = aclPlugEngine; } @@ -41,4 +55,30 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService { return authenticationResult; } + @Override + public AccessResource parse(RemotingCommand request ,String remoteAddr) { + HashMap extFields = request.getExtFields(); + AccessControl accessControl = new AccessControl(); + accessControl.setCode(request.getCode()); + accessControl.setRecognition(remoteAddr); + if (extFields != null) { + accessControl.setAccount(extFields.get("account")); + accessControl.setPassword(extFields.get("password")); + accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]); + accessControl.setTopic(extFields.get("topic")); +} + return accessControl; + } + + @Override + public void validate(AccessResource accessResource) { + AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl)accessResource); + if (authenticationResult.getException() != null) { + throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); + } + if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { + throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); + } + } + } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java index badae946..e4ef9872 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java @@ -30,6 +30,8 @@ public interface AclPlugEngine { public void deleteLoginInfo(String remoteAddr); public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl); + + public AuthenticationResult eachCheckAuthentication(AccessControl accessControl); public void initialize(); } diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java index 6aac6bd4..12f7d8b2 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.acl.plug.engine; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,7 +38,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME); ControllerParameters controllerParameters; - private Map> accessControlMap = new HashMap<>(); + private Map> accessControlMap = new HashMap<>(); private AuthenticationInfo authenticationInfo; private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory(); private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); @@ -54,13 +55,13 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl } try { NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl); - Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); - if (accessControlAddressMap == null) { - accessControlAddressMap = new HashMap<>(); - accessControlMap.put(accessControl.getAccount(), accessControlAddressMap); + List accessControlAddressList = accessControlMap.get(accessControl.getAccount()); + if (accessControlAddressList == null) { + accessControlAddressList = new ArrayList<>(); + accessControlMap.put(accessControl.getAccount(), accessControlAddressList); } AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy); - accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo); + accessControlAddressList.add( authenticationInfo); log.info("authenticationInfo is {}", authenticationInfo.toString()); } catch (Exception e) { throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e); @@ -84,24 +85,19 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl } public AuthenticationInfo getAccessControl(AccessControl accessControl) { - AuthenticationInfo existing = null; if (accessControl.getAccount() == null && authenticationInfo != null) { - existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null; + return authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null; } else { - Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); - if (accessControlAddressMap != null) { - existing = accessControlAddressMap.get(accessControl.getNetaddress()); - if (existing == null) - return null; - if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) { - if (existing.getNetaddressStrategy().match(accessControl)) { - return existing; - } - } - existing = null; + List accessControlAddressList = accessControlMap.get(accessControl.getAccount()); + if (accessControlAddressList != null) { + for(AuthenticationInfo ai : accessControlAddressList) { + if(ai.getNetaddressStrategy().match(accessControl)&&ai.getAccessControl().getPassword().equals(accessControl.getPassword())) { + return ai; + } + } } } - return existing; + return null; } @Override @@ -112,12 +108,27 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl if (authenticationInfo != null) { boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult); authenticationResult.setSucceed(boo); + authenticationResult.setAccessControl(authenticationInfo.getAccessControl()); } } catch (Exception e) { authenticationResult.setException(e); } return authenticationResult; } + + public AuthenticationResult eachCheckAuthentication(AccessControl accessControl) { + AuthenticationResult authenticationResult = new AuthenticationResult(); + AuthenticationInfo authenticationInfo = getAccessControl(accessControl); + if(authenticationInfo != null) { + boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult); + authenticationResult.setSucceed(boo); + }else { + authenticationResult.setResultString("accessControl is null, Please check login, password, IP\""); + } + + + return authenticationResult; + } void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) { if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) { diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java index cf3a736a..b46a034b 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.acl.plug.entity; -public class AccessControl { +import org.apache.rocketmq.acl.AccessResource; + +public class AccessControl implements AccessResource{ private String account; diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java index 708bcbeb..74ae4a7e 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java @@ -16,11 +16,12 @@ */ package org.apache.rocketmq.acl.plug.entity; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.RequestCode; public class ControllerParameters { - private String fileHome; + private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private Class accessContralAnalysisClass = RequestCode.class; diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java new file mode 100644 index 00000000..c0d8cdb1 --- /dev/null +++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java @@ -0,0 +1,132 @@ +package org.apache.rocketmq.acl.plug; + +import java.util.HashMap; + +import org.apache.rocketmq.acl.AccessResource; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.acl.plug.entity.AccessControl; +import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; +import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl; +import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test;; + +public class AclRemotingServiceTest { + + + AclRemotingService defaultAclService; + + AccessValidator accessValidator; + + AccessControl accessControl; + + AccessControl accessControlTwo; + + @Before + public void init() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + DefaultAclRemotingServiceImpl aclRemotingServiceImpl = new DefaultAclRemotingServiceImpl(); + defaultAclService = aclRemotingServiceImpl; + accessValidator = aclRemotingServiceImpl; + + accessControl = new BorkerAccessControl(); + accessControl.setAccount("RocketMQ"); + accessControl.setPassword("1234567"); + accessControl.setNetaddress("192.0.0.1"); + accessControl.setRecognition("127.0.0.1:1"); + + accessControlTwo = new BorkerAccessControl(); + accessControlTwo.setAccount("RocketMQ"); + accessControlTwo.setPassword("1234567"); + accessControlTwo.setNetaddress("192.0.2.1"); + accessControlTwo.setRecognition("127.0.0.1:2"); + } + + + + @Test + public void defaultConstructorTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + AclRemotingService defaultAclService = new DefaultAclRemotingServiceImpl(); + Assert.assertNotNull(defaultAclService); + } + + @Test + public void parseTest() { + RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(34, ""); + HashMap map = new HashMap<>(); + map.put("account", "RocketMQ"); + map.put("password","123456"); + map.put("topic","test"); + remotingCommand.setExtFields(map); + + AccessResource accessResource = accessValidator.parse(remotingCommand, "127.0.0.1:123"); + AccessControl accessControl = (AccessControl)accessResource; + AccessControl newAccessControl = new AccessControl(); + newAccessControl.setAccount("RocketMQ"); + newAccessControl.setPassword("123456"); + newAccessControl.setTopic("test"); + newAccessControl.setCode(34); + newAccessControl.setNetaddress("127.0.0.1"); + newAccessControl.setRecognition("127.0.0.1:123"); + Assert.assertEquals(accessControl.toString(), newAccessControl.toString()); + } + + @Test + public void checkTest() { + accessControl.setCode(34); + AuthenticationResult authenticationResult = defaultAclService.check(accessControl); + Assert.assertTrue(authenticationResult.isSucceed()); + } + + @Test(expected=AclPlugRuntimeException.class) + public void checkAccessExceptionTest() { + accessControl.setCode(34); + accessControl.setAccount("Rocketmq"); + defaultAclService.check(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void checkPasswordTest() { + accessControl.setCode(34); + accessControl.setPassword("123123123"); + defaultAclService.check(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void checkCodeTest() { + accessControl.setCode(14434); + accessControl.setPassword("123123123"); + defaultAclService.check(accessControl); + } + + + @Test + public void validateTest() { + accessControl.setCode(34); + accessValidator.validate(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void validateAccessExceptionTest() { + accessControl.setCode(34); + accessControl.setAccount("Rocketmq"); + accessValidator.validate(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void validatePasswordTest() { + accessControl.setCode(34); + accessControl.setPassword("123123123"); + accessValidator.validate(accessControl); + } + + @Test(expected=AclPlugRuntimeException.class) + public void validateCodeTest() { + accessControl.setCode(14434); + accessControl.setPassword("123123123"); + accessValidator.validate(accessControl); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 7a4c105e..d06949ca 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -160,7 +160,6 @@ public class BrokerController { private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; - private AclPlugController aclPlugController; public BrokerController( final BrokerConfig brokerConfig, @@ -510,7 +509,7 @@ public class BrokerController { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - validator.validate(validator.parse(request)); + validator.validate(validator.parse(request, remoteAddr)); } @Override @@ -1095,9 +1094,6 @@ public class BrokerController { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } - public AclPlugController getAclPlugController() { - return this.aclPlugController; - } public BlockingQueue getEndTransactionThreadPoolQueue() { return endTransactionThreadPoolQueue; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index f4ecc2c0..d536db50 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -72,9 +72,6 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); - if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) { - this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr); - } } @Override diff --git a/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator new file mode 100644 index 00000000..2f26220e --- /dev/null +++ b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator @@ -0,0 +1 @@ +org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java index 22228a6e..a3a35c88 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java @@ -17,12 +17,15 @@ package org.apache.rocketmq.broker.util; +import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; + public class ServiceProviderTest { @Test @@ -38,4 +41,10 @@ public class ServiceProviderTest { AbstractTransactionalMessageCheckListener.class); assertThat(listener).isNotNull(); } + + @Test + public void loadAccessValidatorTest() { + List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + assertThat(accessValidators).isNotNull(); + } } diff --git a/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator new file mode 100644 index 00000000..2f26220e --- /dev/null +++ b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator @@ -0,0 +1 @@ +org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl \ No newline at end of file diff --git a/distribution/conf/broker.conf b/distribution/conf/broker.conf index 363bcbc0..97039573 100644 --- a/distribution/conf/broker.conf +++ b/distribution/conf/broker.conf @@ -20,5 +20,5 @@ deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH -aclPlug=true +enableAcl=true namesrvAddr=127.0.0.1:9876 diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml index 25d4902a..f86e68d4 100644 --- a/distribution/conf/transport.yml +++ b/distribution/conf/transport.yml @@ -14,7 +14,7 @@ # limitations under the License. onlyNetAddress: - netaddress: 10.10.103.* + netaddress: 127.0.0.* noPermitPullTopic: - broker-a @@ -31,4 +31,4 @@ list: permitSendTopic: - test3 - test4 - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 535893c2..38e518da 100644 --- a/pom.xml +++ b/pom.xml @@ -216,9 +216,9 @@ generate-effective-dependencies-pom generate-resources - + ${project.build.directory}/effective-pom/effective-dependencies.xml -- GitLab