提交 74f4213b 编写于 作者: L laohu

arrange

上级 aeea0215
...@@ -25,7 +25,7 @@ public interface AccessValidator { ...@@ -25,7 +25,7 @@ public interface AccessValidator {
* @param request * @param request
* @return * @return
*/ */
AccessResource parse(RemotingCommand request); AccessResource parse(RemotingCommand request,String remoteAddr);
/** /**
* Validate the access resource. * Validate the access resource.
......
...@@ -21,7 +21,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -21,7 +21,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultAccessValidator implements AccessValidator { public class DefaultAccessValidator implements AccessValidator {
@Override public AccessResource parse(RemotingCommand request) { @Override public AccessResource parse(RemotingCommand request,String remoteAddr ) {
return null; return null;
} }
......
...@@ -16,15 +16,29 @@ ...@@ -16,15 +16,29 @@
*/ */
package org.apache.rocketmq.acl.plug; package org.apache.rocketmq.acl.plug;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine; import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AccessControl; import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult; import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultAclRemotingServiceImpl implements AclRemotingService { public class DefaultAclRemotingServiceImpl implements AclRemotingService ,AccessValidator{
private AclPlugEngine aclPlugEngine; private AclPlugEngine aclPlugEngine;
public DefaultAclRemotingServiceImpl() {
ControllerParameters controllerParameters = new ControllerParameters();
this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
this.aclPlugEngine.initialize();
}
public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) { public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) {
this.aclPlugEngine = aclPlugEngine; this.aclPlugEngine = aclPlugEngine;
} }
...@@ -41,4 +55,30 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService { ...@@ -41,4 +55,30 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService {
return authenticationResult; return authenticationResult;
} }
@Override
public AccessResource parse(RemotingCommand request ,String remoteAddr) {
HashMap<String, String> extFields = request.getExtFields();
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
accessControl.setTopic(extFields.get("topic"));
}
return accessControl;
}
@Override
public void validate(AccessResource accessResource) {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl)accessResource);
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
}
} }
...@@ -31,5 +31,7 @@ public interface AclPlugEngine { ...@@ -31,5 +31,7 @@ public interface AclPlugEngine {
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl); public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl);
public AuthenticationResult eachCheckAuthentication(AccessControl accessControl);
public void initialize(); public void initialize();
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.acl.plug.engine; package org.apache.rocketmq.acl.plug.engine;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -37,7 +38,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl ...@@ -37,7 +38,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
ControllerParameters controllerParameters; ControllerParameters controllerParameters;
private Map<String/** account **/, Map<String/** netaddress **/, AuthenticationInfo>> accessControlMap = new HashMap<>(); private Map<String/** account **/, List<AuthenticationInfo>> accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo; private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory(); private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis(); private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
...@@ -54,13 +55,13 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl ...@@ -54,13 +55,13 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
} }
try { try {
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl); NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressMap == null) { if (accessControlAddressList == null) {
accessControlAddressMap = new HashMap<>(); accessControlAddressList = new ArrayList<>();
accessControlMap.put(accessControl.getAccount(), accessControlAddressMap); accessControlMap.put(accessControl.getAccount(), accessControlAddressList);
} }
AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy); AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo); accessControlAddressList.add( authenticationInfo);
log.info("authenticationInfo is {}", authenticationInfo.toString()); log.info("authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) { } catch (Exception e) {
throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e); throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e);
...@@ -84,24 +85,19 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl ...@@ -84,24 +85,19 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
} }
public AuthenticationInfo getAccessControl(AccessControl accessControl) { public AuthenticationInfo getAccessControl(AccessControl accessControl) {
AuthenticationInfo existing = null;
if (accessControl.getAccount() == null && authenticationInfo != null) { if (accessControl.getAccount() == null && authenticationInfo != null) {
existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null; return authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
} else { } else {
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount()); List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressMap != null) { if (accessControlAddressList != null) {
existing = accessControlAddressMap.get(accessControl.getNetaddress()); for(AuthenticationInfo ai : accessControlAddressList) {
if (existing == null) if(ai.getNetaddressStrategy().match(accessControl)&&ai.getAccessControl().getPassword().equals(accessControl.getPassword())) {
return null; return ai;
if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) {
if (existing.getNetaddressStrategy().match(accessControl)) {
return existing;
} }
} }
existing = null;
} }
} }
return existing; return null;
} }
@Override @Override
...@@ -112,6 +108,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl ...@@ -112,6 +108,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
if (authenticationInfo != null) { if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult); boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo); authenticationResult.setSucceed(boo);
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
} }
} catch (Exception e) { } catch (Exception e) {
authenticationResult.setException(e); authenticationResult.setException(e);
...@@ -119,6 +116,20 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl ...@@ -119,6 +116,20 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
return authenticationResult; return authenticationResult;
} }
public AuthenticationResult eachCheckAuthentication(AccessControl accessControl) {
AuthenticationResult authenticationResult = new AuthenticationResult();
AuthenticationInfo authenticationInfo = getAccessControl(accessControl);
if(authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
}else {
authenticationResult.setResultString("accessControl is null, Please check login, password, IP\"");
}
return authenticationResult;
}
void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) { void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) { if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty"); throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty");
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
*/ */
package org.apache.rocketmq.acl.plug.entity; package org.apache.rocketmq.acl.plug.entity;
public class AccessControl { import org.apache.rocketmq.acl.AccessResource;
public class AccessControl implements AccessResource{
private String account; private String account;
......
...@@ -16,11 +16,12 @@ ...@@ -16,11 +16,12 @@
*/ */
package org.apache.rocketmq.acl.plug.entity; package org.apache.rocketmq.acl.plug.entity;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
public class ControllerParameters { public class ControllerParameters {
private String fileHome; private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Class<?> accessContralAnalysisClass = RequestCode.class; private Class<?> accessContralAnalysisClass = RequestCode.class;
......
package org.apache.rocketmq.acl.plug;
import java.util.HashMap;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;;
public class AclRemotingServiceTest {
AclRemotingService defaultAclService;
AccessValidator accessValidator;
AccessControl accessControl;
AccessControl accessControlTwo;
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
DefaultAclRemotingServiceImpl aclRemotingServiceImpl = new DefaultAclRemotingServiceImpl();
defaultAclService = aclRemotingServiceImpl;
accessValidator = aclRemotingServiceImpl;
accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("1234567");
accessControl.setNetaddress("192.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("RocketMQ");
accessControlTwo.setPassword("1234567");
accessControlTwo.setNetaddress("192.0.2.1");
accessControlTwo.setRecognition("127.0.0.1:2");
}
@Test
public void defaultConstructorTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
AclRemotingService defaultAclService = new DefaultAclRemotingServiceImpl();
Assert.assertNotNull(defaultAclService);
}
@Test
public void parseTest() {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(34, "");
HashMap<String ,String> map = new HashMap<>();
map.put("account", "RocketMQ");
map.put("password","123456");
map.put("topic","test");
remotingCommand.setExtFields(map);
AccessResource accessResource = accessValidator.parse(remotingCommand, "127.0.0.1:123");
AccessControl accessControl = (AccessControl)accessResource;
AccessControl newAccessControl = new AccessControl();
newAccessControl.setAccount("RocketMQ");
newAccessControl.setPassword("123456");
newAccessControl.setTopic("test");
newAccessControl.setCode(34);
newAccessControl.setNetaddress("127.0.0.1");
newAccessControl.setRecognition("127.0.0.1:123");
Assert.assertEquals(accessControl.toString(), newAccessControl.toString());
}
@Test
public void checkTest() {
accessControl.setCode(34);
AuthenticationResult authenticationResult = defaultAclService.check(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
}
@Test(expected=AclPlugRuntimeException.class)
public void checkAccessExceptionTest() {
accessControl.setCode(34);
accessControl.setAccount("Rocketmq");
defaultAclService.check(accessControl);
}
@Test(expected=AclPlugRuntimeException.class)
public void checkPasswordTest() {
accessControl.setCode(34);
accessControl.setPassword("123123123");
defaultAclService.check(accessControl);
}
@Test(expected=AclPlugRuntimeException.class)
public void checkCodeTest() {
accessControl.setCode(14434);
accessControl.setPassword("123123123");
defaultAclService.check(accessControl);
}
@Test
public void validateTest() {
accessControl.setCode(34);
accessValidator.validate(accessControl);
}
@Test(expected=AclPlugRuntimeException.class)
public void validateAccessExceptionTest() {
accessControl.setCode(34);
accessControl.setAccount("Rocketmq");
accessValidator.validate(accessControl);
}
@Test(expected=AclPlugRuntimeException.class)
public void validatePasswordTest() {
accessControl.setCode(34);
accessControl.setPassword("123123123");
accessValidator.validate(accessControl);
}
@Test(expected=AclPlugRuntimeException.class)
public void validateCodeTest() {
accessControl.setCode(14434);
accessControl.setPassword("123123123");
accessValidator.validate(accessControl);
}
}
...@@ -160,7 +160,6 @@ public class BrokerController { ...@@ -160,7 +160,6 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService; private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private AclPlugController aclPlugController;
public BrokerController( public BrokerController(
final BrokerConfig brokerConfig, final BrokerConfig brokerConfig,
...@@ -510,7 +509,7 @@ public class BrokerController { ...@@ -510,7 +509,7 @@ public class BrokerController {
@Override @Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
validator.validate(validator.parse(request)); validator.validate(validator.parse(request, remoteAddr));
} }
@Override @Override
...@@ -1095,9 +1094,6 @@ public class BrokerController { ...@@ -1095,9 +1094,6 @@ public class BrokerController {
this.transactionalMessageCheckListener = transactionalMessageCheckListener; this.transactionalMessageCheckListener = transactionalMessageCheckListener;
} }
public AclPlugController getAclPlugController() {
return this.aclPlugController;
}
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() { public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue; return endTransactionThreadPoolQueue;
......
...@@ -72,9 +72,6 @@ public class ClientHousekeepingService implements ChannelEventListener { ...@@ -72,9 +72,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) {
this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr);
}
} }
@Override @Override
......
org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl
\ No newline at end of file
...@@ -17,12 +17,15 @@ ...@@ -17,12 +17,15 @@
package org.apache.rocketmq.broker.util; package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
public class ServiceProviderTest { public class ServiceProviderTest {
@Test @Test
...@@ -38,4 +41,10 @@ public class ServiceProviderTest { ...@@ -38,4 +41,10 @@ public class ServiceProviderTest {
AbstractTransactionalMessageCheckListener.class); AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull(); assertThat(listener).isNotNull();
} }
@Test
public void loadAccessValidatorTest() {
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
assertThat(accessValidators).isNotNull();
}
} }
org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl
\ No newline at end of file
...@@ -20,5 +20,5 @@ deleteWhen = 04 ...@@ -20,5 +20,5 @@ deleteWhen = 04
fileReservedTime = 48 fileReservedTime = 48
brokerRole = ASYNC_MASTER brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH flushDiskType = ASYNC_FLUSH
aclPlug=true enableAcl=true
namesrvAddr=127.0.0.1:9876 namesrvAddr=127.0.0.1:9876
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
onlyNetAddress: onlyNetAddress:
netaddress: 10.10.103.* netaddress: 127.0.0.*
noPermitPullTopic: noPermitPullTopic:
- broker-a - broker-a
......
...@@ -216,9 +216,9 @@ ...@@ -216,9 +216,9 @@
<execution> <execution>
<id>generate-effective-dependencies-pom</id> <id>generate-effective-dependencies-pom</id>
<phase>generate-resources</phase> <phase>generate-resources</phase>
<goals> <!-- <goals>
<goal>effective-pom</goal> <goal>effective-pom</goal>
</goals> </goals> -->
<configuration> <configuration>
<output>${project.build.directory}/effective-pom/effective-dependencies.xml</output> <output>${project.build.directory}/effective-pom/effective-dependencies.xml</output>
</configuration> </configuration>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册