未验证 提交 760c24ce 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge pull request #554 from githublaohu/feature_acl

Rewrite the design of acl feature
......@@ -18,46 +18,24 @@ package org.apache.rocketmq.acl;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.AclRemotingService;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.acl.plug.AccessControl;
import org.apache.rocketmq.acl.plug.AclPlugRuntimeException;
import org.apache.rocketmq.acl.plug.AuthenticationResult;
import org.apache.rocketmq.acl.plug.PlainAclPlugEngine;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class PlainAccessValidator implements AclRemotingService, AccessValidator {
public class PlainAccessValidator implements AccessValidator {
private AclPlugEngine aclPlugEngine;
private PlainAclPlugEngine aclPlugEngine;
public PlainAccessValidator() {
ControllerParameters controllerParameters = new ControllerParameters();
this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
this.aclPlugEngine.initialize();
}
public PlainAccessValidator(AclPlugEngine aclPlugEngine) {
this.aclPlugEngine = aclPlugEngine;
}
@Override
public AuthenticationResult check(AccessControl accessControl) {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckLoginAndAuthentication(accessControl);
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessControl.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessControl.toString()));
}
return authenticationResult;
aclPlugEngine = new PlainAclPlugEngine();
}
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
HashMap<String, String> extFields = request.getExtFields();
int code = request.getCode();
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
......@@ -65,24 +43,32 @@ public class PlainAccessValidator implements AclRemotingService, AccessValidator
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setTopic(extFields.get("topic"));
if (code == 310 || code == 320) {
accessControl.setTopic(extFields.get("b"));
} else {
accessControl.setTopic(extFields.get("topic"));
}
}
return accessControl;
}
@Override
public void validate(AccessResource accessResource) {
AuthenticationResult authenticationResult = null;
try {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
if (authenticationResult.isSucceed())
return;
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e);
}
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() != null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
public class AccessContralAnalysis {
private Map<Class<?>, Map<Integer, Field>> classTocodeAndMentod = new HashMap<>();
private Map<String, Integer> fieldNameAndCode = new HashMap<>();
public void analysisClass(Class<?> clazz) {
Field[] fields = clazz.getDeclaredFields();
try {
for (Field field : fields) {
if (field.getType().equals(int.class)) {
String name = StringUtils.replace(field.getName(), "_", "").toLowerCase();
fieldNameAndCode.put(name, (Integer) field.get(null));
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(String.format("analysis on failure Class is %s", clazz.getName()), e);
}
}
public Map<Integer, Boolean> analysis(AccessControl accessControl) {
Class<? extends AccessControl> clazz = accessControl.getClass();
Map<Integer, Field> codeAndField = classTocodeAndMentod.get(clazz);
if (codeAndField == null) {
codeAndField = new HashMap<>();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (!field.getType().equals(boolean.class))
continue;
Integer code = fieldNameAndCode.get(field.getName().toLowerCase());
if (code == null) {
throw new AclPlugRuntimeException(String.format("field nonexistent in code fieldName is %s", field.getName()));
}
field.setAccessible(true);
codeAndField.put(code, field);
}
if (codeAndField.isEmpty()) {
throw new AclPlugRuntimeException(String.format("AccessControl nonexistent code , name %s", accessControl.getClass().getName()));
}
classTocodeAndMentod.put(clazz, codeAndField);
}
Iterator<Entry<Integer, Field>> it = codeAndField.entrySet().iterator();
Map<Integer, Boolean> authority = new HashMap<>();
try {
while (it.hasNext()) {
Entry<Integer, Field> e = it.next();
authority.put(e.getKey(), (Boolean) e.getValue().get(accessControl));
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(String.format("analysis on failure AccessControl is %s", AccessControl.class.getName()), e);
}
return authority;
}
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.AccessResource;
......@@ -87,8 +87,8 @@ public class AccessControl implements AccessResource {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AccessControl [account=").append(account).append(", password=").append(password)
.append(", netaddress=").append(netaddress).append(", recognition=").append(recognition)
.append(", code=").append(code).append(", topic=").append(topic).append("]");
.append(", netaddress=").append(netaddress).append(", recognition=").append(recognition)
.append(", code=").append(code).append(", topic=").append(topic).append("]");
return builder.toString();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.PlainAccessValidator;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AclPlugController {
private ControllerParameters controllerParameters;
private AclPlugEngine aclPlugEngine;
private AclRemotingService aclRemotingService;
private boolean startSucceed = false;
public AclPlugController(ControllerParameters controllerParameters) throws AclPlugRuntimeException {
try {
this.controllerParameters = controllerParameters;
aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
aclPlugEngine.initialize();
aclRemotingService = new PlainAccessValidator(aclPlugEngine);
this.startSucceed = true;
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("Start the abnormal , Launch parameters is %s", this.controllerParameters.toString()), e);
}
}
public AclRemotingService getAclRemotingService() {
return this.aclRemotingService;
}
public void doChannelCloseEvent(String remoteAddr) {
if (this.startSucceed) {
aclPlugEngine.deleteLoginInfo(remoteAddr);
}
}
public boolean isStartSucceed() {
return startSucceed;
}
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.exception;
package org.apache.rocketmq.acl.plug;
public class AclPlugRuntimeException extends RuntimeException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
public interface AclRemotingService {
public AuthenticationResult check(AccessControl accessControl);
}
......@@ -16,8 +16,11 @@
*/
package org.apache.rocketmq.acl.plug;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.yaml.snakeyaml.Yaml;
public class AclUtils {
......@@ -79,4 +82,23 @@ public class AclUtils {
return minus.indexOf('-') > -1;
}
public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml ymal = new Yaml();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("The transport.yml file for Plain mode was not found , paths %s", path), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclPlugRuntimeException("close transport fileInputStream Exception", e);
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
public class Authentication {
public boolean authentication(AuthenticationInfo authenticationInfo,
AccessControl accessControl, AuthenticationResult authenticationResult) {
int code = accessControl.getCode();
if (!authenticationInfo.getAuthority().get(code)) {
authenticationResult.setResultString(String.format("code is %d Authentication failed", code));
return false;
}
if (!(authenticationInfo.getAccessControl() instanceof BorkerAccessControl)) {
return true;
}
BorkerAccessControl borker = (BorkerAccessControl) authenticationInfo.getAccessControl();
String topicName = accessControl.getTopic();
if (code == 10 || code == 310 || code == 320) {
if (borker.getPermitSendTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitSendTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName));
return false;
}
return borker.getPermitSendTopic().isEmpty() ? true : false;
} else if (code == 11) {
if (borker.getPermitPullTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitPullTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName));
return false;
}
return borker.getPermitPullTopic().isEmpty() ? true : false;
}
return true;
}
}
......@@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
package org.apache.rocketmq.acl.plug;
import java.util.Iterator;
import java.util.Map;
......@@ -31,7 +29,7 @@ public class AuthenticationInfo {
private Map<Integer, Boolean> authority;
public AuthenticationInfo(Map<Integer, Boolean> authority, AccessControl accessControl,
NetaddressStrategy netaddressStrategy) {
NetaddressStrategy netaddressStrategy) {
super();
this.authority = authority;
this.accessControl = accessControl;
......@@ -66,7 +64,7 @@ public class AuthenticationInfo {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticationInfo [accessControl=").append(accessControl).append(", netaddressStrategy=")
.append(netaddressStrategy).append(", authority={");
.append(netaddressStrategy).append(", authority={");
Iterator<Entry<Integer, Boolean>> it = authority.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
package org.apache.rocketmq.acl.plug;
public class AuthenticationResult {
......
......@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
package org.apache.rocketmq.acl.plug;
import java.util.HashSet;
import java.util.Set;
public class BorkerAccessControl extends AccessControl {
public class BrokerAccessControl extends AccessControl {
private boolean admin;
private Set<String> permitSendTopic = new HashSet<>();
private Set<String> noPermitSendTopic = new HashSet<>();
......@@ -54,13 +56,13 @@ public class BorkerAccessControl extends AccessControl {
private boolean endTransaction = true;
private boolean updateAndCreateTopic = true;
private boolean updateAndCreateTopic = false;
private boolean deleteTopicInbroker = true;
private boolean deleteTopicInbroker = false;
private boolean getAllTopicConfig = true;
private boolean updateBrokerConfig = true;
private boolean updateBrokerConfig = false;
private boolean getBrokerConfig = true;
......@@ -78,11 +80,11 @@ public class BorkerAccessControl extends AccessControl {
private boolean unlockBatchMQ = true;
private boolean updateAndCreateSubscriptiongroup = true;
private boolean updateAndCreateSubscriptiongroup = false;
private boolean getAllSubscriptiongroupConfig = true;
private boolean deleteSubscriptiongroup = true;
private boolean deleteSubscriptiongroup = false;
private boolean getTopicStatsInfo = true;
......@@ -124,8 +126,16 @@ public class BorkerAccessControl extends AccessControl {
private boolean queryConsumeQueue = true;
public BorkerAccessControl() {
public BrokerAccessControl() {
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public Set<String> getPermitSendTopic() {
......@@ -556,8 +566,8 @@ public class BorkerAccessControl extends AccessControl {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("BorkerAccessControl [permitSendTopic=").append(permitSendTopic).append(", noPermitSendTopic=")
.append(noPermitSendTopic).append(", permitPullTopic=").append(permitPullTopic)
.append(", noPermitPullTopic=").append(noPermitPullTopic);
.append(noPermitSendTopic).append(", permitPullTopic=").append(permitPullTopic)
.append(", noPermitPullTopic=").append(noPermitPullTopic);
if (!!sendMessage)
builder.append(", sendMessage=").append(sendMessage);
if (!!sendMessageV2)
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.acl.plug;
public class AclPlugControllerTest {
public interface NetaddressStrategy {
public boolean match(AccessControl accessControl);
}
......@@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
package org.apache.rocketmq.acl.plug;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
public class NetaddressStrategyFactory {
......
......@@ -14,58 +14,155 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
package org.apache.rocketmq.acl.plug;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.Authentication;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPlugEngine {
public class PlainAclPlugEngine {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
ControllerParameters controllerParameters;
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Map<String/** account **/, List<AuthenticationInfo>> accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
private Authentication authentication = new Authentication();
public AuthenticationInfoManagementAclPlugEngine(ControllerParameters controllerParameters) {
this.controllerParameters = controllerParameters;
accessContralAnalysis.analysisClass(controllerParameters.getAccessContralAnalysisClass());
private Class<?> accessContralAnalysisClass = RequestCode.class;
private boolean isWatchStart;
public PlainAclPlugEngine() {
initialize();
watch();
}
public void initialize() {
BrokerAccessControlTransport accessControlTransport = AclUtils.getYamlDataObject(fileHome + "/conf/transport.yml", BrokerAccessControlTransport.class);
if (accessControlTransport == null) {
throw new AclPlugRuntimeException("transport.yml file is no data");
}
log.info("BorkerAccessControlTransport data is : ", accessControlTransport.toString());
accessContralAnalysis.analysisClass(accessContralAnalysisClass);
setBrokerAccessControlTransport(accessControlTransport);
}
private void watch() {
String version = System.getProperty("java.version");
log.info("java.version is : {}", version);
String[] str = StringUtils.split(version, ".");
if (Integer.valueOf(str[1]) < 7) {
log.warn("wacth need jdk 1.7 support , current version no support");
return;
}
try {
final WatchService watcher = FileSystems.getDefault().newWatchService();
Path p = Paths.get(fileHome + "/conf/");
p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
ServiceThread watcherServcie = new ServiceThread() {
public void run() {
while (true) {
try {
while (true) {
WatchKey watchKey = watcher.take();
List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
for (WatchEvent<?> event : watchEvents) {
if ("transport.yml".equals(event.context().toString()) &&
(StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
log.info("transprot.yml make a difference change is : ", event.toString());
PlainAclPlugEngine.this.cleanAuthenticationInfo();
initialize();
}
}
watchKey.reset();
}
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
@Override
public String getServiceName() {
return "watcherServcie";
}
};
watcherServcie.start();
log.info("succeed start watcherServcie");
this.isWatchStart = true;
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
private void handleAccessControl(AccessControl accessControl) {
if (accessControl instanceof BrokerAccessControl) {
BrokerAccessControl brokerAccessControl = (BrokerAccessControl) accessControl;
if (brokerAccessControl.isAdmin()) {
brokerAccessControl.setUpdateAndCreateSubscriptiongroup(true);
brokerAccessControl.setDeleteSubscriptiongroup(true);
brokerAccessControl.setUpdateAndCreateTopic(true);
brokerAccessControl.setDeleteTopicInbroker(true);
brokerAccessControl.setUpdateBrokerConfig(true);
}
}
}
void cleanAuthenticationInfo() {
accessControlMap.clear();
authenticationInfo = null;
}
public void setAccessControl(AccessControl accessControl) throws AclPlugRuntimeException {
if (accessControl.getAccount() == null || accessControl.getPassword() == null || accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) {
throw new AclPlugRuntimeException(String.format("The account password cannot be null and is longer than 6, account is %s password is %s", accessControl.getAccount(), accessControl.getPassword()));
if (accessControl.getAccount() == null || accessControl.getPassword() == null
|| accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) {
throw new AclPlugRuntimeException(String.format(
"The account password cannot be null and is longer than 6, account is %s password is %s",
accessControl.getAccount(), accessControl.getPassword()));
}
try {
handleAccessControl(accessControl);
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressList == null) {
accessControlAddressList = new ArrayList<>();
accessControlMap.put(accessControl.getAccount(), accessControlAddressList);
}
AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
AuthenticationInfo authenticationInfo = new AuthenticationInfo(
accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
accessControlAddressList.add(authenticationInfo);
log.info("authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("Exception info %s %s", e.getMessage(), accessControl.toString()), e);
throw new AclPlugRuntimeException(
String.format("Exception info %s %s", e.getMessage(), accessControl.toString()), e);
}
}
......@@ -101,38 +198,20 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
return null;
}
@Override
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl) {
AuthenticationResult authenticationResult = new AuthenticationResult();
try {
AuthenticationInfo authenticationInfo = getAuthenticationInfo(accessControl, authenticationResult);
if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
}
} catch (Exception e) {
authenticationResult.setException(e);
}
return authenticationResult;
}
public AuthenticationResult eachCheckAuthentication(AccessControl accessControl) {
AuthenticationResult authenticationResult = new AuthenticationResult();
AuthenticationInfo authenticationInfo = getAccessControl(accessControl);
if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
boolean boo = authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
} else {
authenticationResult.setResultString("accessControl is null, Please check login, password, IP\"");
}
return authenticationResult;
}
void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
void setBrokerAccessControlTransport(BrokerAccessControlTransport transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty");
}
......@@ -141,12 +220,137 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
this.setNetaddressAccessControl(transport.getOnlyNetAddress());
}
if (transport.getList() != null || transport.getList().size() > 0) {
for (AccessControl accessControl : transport.getList()) {
for (BrokerAccessControl accessControl : transport.getList()) {
this.setAccessControl(accessControl);
}
}
}
protected abstract AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
AuthenticationResult authenticationResult);
public boolean authentication(AuthenticationInfo authenticationInfo, AccessControl accessControl,
AuthenticationResult authenticationResult) {
int code = accessControl.getCode();
if (!authenticationInfo.getAuthority().get(code)) {
authenticationResult.setResultString(String.format("code is %d Authentication failed", code));
return false;
}
if (!(authenticationInfo.getAccessControl() instanceof BrokerAccessControl)) {
return true;
}
BrokerAccessControl borker = (BrokerAccessControl) authenticationInfo.getAccessControl();
String topicName = accessControl.getTopic();
if (code == 10 || code == 310 || code == 320) {
if (borker.getPermitSendTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitSendTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName));
return false;
}
return borker.getPermitSendTopic().isEmpty() ? true : false;
} else if (code == 11) {
if (borker.getPermitPullTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitPullTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName));
return false;
}
return borker.getPermitPullTopic().isEmpty() ? true : false;
}
return true;
}
public boolean isWatchStart() {
return isWatchStart;
}
public static class AccessContralAnalysis {
private Map<Class<?>, Map<Integer, Field>> classTocodeAndMentod = new HashMap<>();
private Map<String, Integer> fieldNameAndCode = new HashMap<>();
public void analysisClass(Class<?> clazz) {
Field[] fields = clazz.getDeclaredFields();
try {
for (Field field : fields) {
if (field.getType().equals(int.class)) {
String name = StringUtils.replace(field.getName(), "_", "").toLowerCase();
fieldNameAndCode.put(name, (Integer) field.get(null));
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(String.format("analysis on failure Class is %s", clazz.getName()), e);
}
}
public Map<Integer, Boolean> analysis(AccessControl accessControl) {
Class<? extends AccessControl> clazz = accessControl.getClass();
Map<Integer, Field> codeAndField = classTocodeAndMentod.get(clazz);
if (codeAndField == null) {
codeAndField = new HashMap<>();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if ("admin".equals(field.getName()))
continue;
if (!field.getType().equals(boolean.class))
continue;
Integer code = fieldNameAndCode.get(field.getName().toLowerCase());
if (code == null) {
throw new AclPlugRuntimeException(
String.format("field nonexistent in code fieldName is %s", field.getName()));
}
field.setAccessible(true);
codeAndField.put(code, field);
}
if (codeAndField.isEmpty()) {
throw new AclPlugRuntimeException(String.format("AccessControl nonexistent code , name %s",
accessControl.getClass().getName()));
}
classTocodeAndMentod.put(clazz, codeAndField);
}
Iterator<Entry<Integer, Field>> it = codeAndField.entrySet().iterator();
Map<Integer, Boolean> authority = new HashMap<>();
try {
while (it.hasNext()) {
Entry<Integer, Field> e = it.next();
authority.put(e.getKey(), (Boolean) e.getValue().get(accessControl));
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(
String.format("analysis on failure AccessControl is %s", AccessControl.class.getName()), e);
}
return authority;
}
}
public static class BrokerAccessControlTransport {
private BrokerAccessControl onlyNetAddress;
private List<BrokerAccessControl> list;
public BrokerAccessControl getOnlyNetAddress() {
return onlyNetAddress;
}
public void setOnlyNetAddress(BrokerAccessControl onlyNetAddress) {
this.onlyNetAddress = onlyNetAddress;
}
public List<BrokerAccessControl> getList() {
return list;
}
public void setList(List<BrokerAccessControl> list) {
this.list = list;
}
@Override
public String toString() {
return "BorkerAccessControlTransport [onlyNetAddress=" + onlyNetAddress + ", list=" + list + "]";
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
public interface AclPlugEngine {
public AuthenticationInfo getAccessControl(AccessControl accessControl);
public LoginInfo getLoginInfo(AccessControl accessControl);
public void deleteLoginInfo(String remoteAddr);
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl);
public AuthenticationResult eachCheckAuthentication(AccessControl accessControl);
public void initialize();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagementAclPlugEngine {
private Map<String, LoginInfo> loginInfoMap = new ConcurrentHashMap<>();
public LoginInfoAclPlugEngine(ControllerParameters controllerParameters) {
super(controllerParameters);
}
public LoginInfo getLoginInfo(AccessControl accessControl) {
LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
if (loginInfo == null) {
AuthenticationInfo authenticationInfo = super.getAccessControl(accessControl);
if (authenticationInfo != null) {
loginInfo = new LoginInfo();
loginInfo.setAuthenticationInfo(authenticationInfo);
loginInfoMap.put(accessControl.getRecognition(), loginInfo);
}
}
if (loginInfo != null) {
loginInfo.setOperationTime(System.currentTimeMillis());
}
return loginInfo;
}
public void deleteLoginInfo(String remoteAddr) {
loginInfoMap.remove(remoteAddr);
}
protected AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
AuthenticationResult authenticationResult) {
LoginInfo loginInfo = getLoginInfo(accessControl);
if (loginInfo != null && loginInfo.getAuthenticationInfo() != null) {
return loginInfo.getAuthenticationInfo();
}
authenticationResult.setResultString("Login information does not exist, Please check login, password, IP");
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
public PlainAclPlugEngine(
ControllerParameters controllerParameters) throws AclPlugRuntimeException {
super(controllerParameters);
}
public void initialize() throws AclPlugRuntimeException {
String filePath = controllerParameters.getFileHome() + "/conf/transport.yml";
Yaml ymal = new Yaml();
FileInputStream fis = null;
BorkerAccessControlTransport transport;
try {
fis = new FileInputStream(new File(filePath));
transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("The transport.yml file for Plain mode was not found , paths %s", filePath), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclPlugRuntimeException("close transport fileInputStream Exception", e);
}
}
}
if (transport == null) {
throw new AclPlugRuntimeException("transport.yml file is no data");
}
super.setBorkerAccessControlTransport(transport);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import java.util.List;
public class BorkerAccessControlTransport {
private BorkerAccessControl onlyNetAddress;
private List<BorkerAccessControl> list;
public BorkerAccessControlTransport() {
super();
}
public BorkerAccessControl getOnlyNetAddress() {
return onlyNetAddress;
}
public void setOnlyNetAddress(BorkerAccessControl onlyNetAddress) {
this.onlyNetAddress = onlyNetAddress;
}
public List<BorkerAccessControl> getList() {
return list;
}
public void setList(List<BorkerAccessControl> list) {
this.list = list;
}
@Override
public String toString() {
return "BorkerAccessControlTransport [onlyNetAddress=" + onlyNetAddress + ", list=" + list + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode;
public class ControllerParameters {
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Class<?> accessContralAnalysisClass = RequestCode.class;
public String getFileHome() {
return fileHome;
}
public void setFileHome(String fileHome) {
this.fileHome = fileHome;
}
public Class<?> getAccessContralAnalysisClass() {
return accessContralAnalysisClass;
}
public void setAccessContralAnalysisClass(Class<?> accessContralAnalysisClass) {
this.accessContralAnalysisClass = accessContralAnalysisClass;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ControllerParametersEntity [fileHome=").append(fileHome).append(", accessContralAnalysisClass=")
.append(accessContralAnalysisClass).append("]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import java.util.concurrent.atomic.AtomicBoolean;
public class LoginInfo {
private String recognition;
private long loginTime = System.currentTimeMillis();
private volatile long operationTime = loginTime;
private volatile AtomicBoolean clear = new AtomicBoolean();
private AuthenticationInfo authenticationInfo;
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
}
public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
this.authenticationInfo = authenticationInfo;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
public long getLoginTime() {
return loginTime;
}
public void setLoginTime(long loginTime) {
this.loginTime = loginTime;
}
public long getOperationTime() {
return operationTime;
}
public void setOperationTime(long operationTime) {
this.operationTime = operationTime;
}
public AtomicBoolean getClear() {
return clear;
}
public void setClear(AtomicBoolean clear) {
this.clear = clear;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("LoginInfo [recognition=").append(recognition).append(", loginTime=").append(loginTime)
.append(", operationTime=").append(operationTime).append(", clear=").append(clear)
.append(", authenticationInfo=").append(authenticationInfo).append("]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public interface NetaddressStrategy {
public boolean match(AccessControl accessControl);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AccessContralAnalysisTest {
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
@Before
public void init() {
accessContralAnalysis.analysisClass(RequestCode.class);
}
@Test
public void analysisTest() {
BorkerAccessControl accessControl = new BorkerAccessControl();
accessControl.setSendMessage(false);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(accessControl);
Iterator<Entry<Integer, Boolean>> it = map.entrySet().iterator();
long num = 0;
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
if (!e.getValue()) {
Assert.assertEquals(e.getKey(), Integer.valueOf(10));
num++;
}
}
Assert.assertEquals(num, 1);
}
@Test(expected = AclPlugRuntimeException.class)
public void analysisExceptionTest() {
AccessControl accessControl = new AccessControl();
accessContralAnalysis.analysis(accessControl);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.HashMap;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.PlainAccessValidator;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;;
public class AclRemotingServiceTest {
AclRemotingService defaultAclService;
AccessValidator accessValidator;
AccessControl accessControl;
AccessControl accessControlTwo;
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
PlainAccessValidator aclRemotingServiceImpl = new PlainAccessValidator();
defaultAclService = aclRemotingServiceImpl;
accessValidator = aclRemotingServiceImpl;
accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("1234567");
accessControl.setNetaddress("192.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("RocketMQ");
accessControlTwo.setPassword("1234567");
accessControlTwo.setNetaddress("192.0.2.1");
accessControlTwo.setRecognition("127.0.0.1:2");
}
@Test
public void defaultConstructorTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
AclRemotingService defaultAclService = new PlainAccessValidator();
Assert.assertNotNull(defaultAclService);
}
@Test
public void parseTest() {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(34, "");
HashMap<String, String> map = new HashMap<>();
map.put("account", "RocketMQ");
map.put("password", "123456");
map.put("topic", "test");
remotingCommand.setExtFields(map);
AccessResource accessResource = accessValidator.parse(remotingCommand, "127.0.0.1:123");
AccessControl accessControl = (AccessControl) accessResource;
AccessControl newAccessControl = new AccessControl();
newAccessControl.setAccount("RocketMQ");
newAccessControl.setPassword("123456");
newAccessControl.setTopic("test");
newAccessControl.setCode(34);
newAccessControl.setNetaddress("127.0.0.1");
newAccessControl.setRecognition("127.0.0.1:123");
Assert.assertEquals(accessControl.toString(), newAccessControl.toString());
}
@Test
public void checkTest() {
accessControl.setCode(34);
AuthenticationResult authenticationResult = defaultAclService.check(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
}
@Test(expected = AclPlugRuntimeException.class)
public void checkAccessExceptionTest() {
accessControl.setCode(34);
accessControl.setAccount("Rocketmq");
defaultAclService.check(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void checkPasswordTest() {
accessControl.setCode(34);
accessControl.setPassword("123123123");
defaultAclService.check(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void checkCodeTest() {
accessControl.setCode(14434);
accessControl.setPassword("123123123");
defaultAclService.check(accessControl);
}
@Test
public void validateTest() {
accessControl.setCode(34);
accessValidator.validate(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void validateAccessExceptionTest() {
accessControl.setCode(34);
accessControl.setAccount("Rocketmq");
accessValidator.validate(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void validatePasswordTest() {
accessControl.setCode(34);
accessControl.setPassword("123123123");
accessValidator.validate(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void validateCodeTest() {
accessControl.setCode(14434);
accessControl.setPassword("123123123");
accessValidator.validate(accessControl);
}
}
......@@ -18,12 +18,10 @@ package org.apache.rocketmq.acl.plug;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
public class AclUtilsTest {
@Test
......@@ -125,4 +123,8 @@ public class AclUtilsTest {
isMinus = AclUtils.isMinus("*");
Assert.assertFalse(isMinus);
}
public void getYamlDataObjectTest() {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AuthenticationTest {
Authentication authentication = new Authentication();
AuthenticationInfo authenticationInfo;
BorkerAccessControl borkerAccessControl;
AuthenticationResult authenticationResult = new AuthenticationResult();
AccessControl accessControl = new AccessControl();
@Before
public void init() {
borkerAccessControl = new BorkerAccessControl();
//321
borkerAccessControl.setQueryConsumeQueue(false);
Set<String> permitSendTopic = new HashSet<>();
permitSendTopic.add("permitSendTopic");
borkerAccessControl.setPermitSendTopic(permitSendTopic);
Set<String> noPermitSendTopic = new HashSet<>();
noPermitSendTopic.add("noPermitSendTopic");
borkerAccessControl.setNoPermitSendTopic(noPermitSendTopic);
Set<String> permitPullTopic = new HashSet<>();
permitPullTopic.add("permitPullTopic");
borkerAccessControl.setPermitPullTopic(permitPullTopic);
Set<String> noPermitPullTopic = new HashSet<>();
noPermitPullTopic.add("noPermitPullTopic");
borkerAccessControl.setNoPermitPullTopic(noPermitPullTopic);
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
accessContralAnalysis.analysisClass(RequestCode.class);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(borkerAccessControl);
authenticationInfo = new AuthenticationInfo(map, borkerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
}
@Test
public void authenticationTest() {
accessControl.setCode(317);
boolean isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(321);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setCode(10);
accessControl.setTopic("permitSendTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(310);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(320);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setTopic("noPermitSendTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setTopic("nopermitSendTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setCode(11);
accessControl.setTopic("permitPullTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setTopic("noPermitPullTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setTopic("nopermitPullTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
}
@Test
public void isEmptyTest() {
accessControl.setCode(10);
accessControl.setTopic("absentTopic");
boolean isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
Set<String> permitSendTopic = new HashSet<>();
borkerAccessControl.setPermitSendTopic(permitSendTopic);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(11);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
borkerAccessControl.setPermitPullTopic(permitSendTopic);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
}
}
......@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.strategy;
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.junit.Assert;
import org.junit.Test;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.acl.plug.PlainAclPlugEngine.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.PlainAclPlugEngine.BrokerAccessControlTransport;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class PlainAclPlugEngineTest {
PlainAclPlugEngine plainAclPlugEngine;
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
AccessControl accessControl;
AccessControl accessControlTwo;
AuthenticationInfo authenticationInfo;
BrokerAccessControl brokerAccessControl;
Set<Integer> adminCode = new HashSet<>();
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
// UPDATE_AND_CREATE_TOPIC
adminCode.add(17);
// UPDATE_BROKER_CONFIG
adminCode.add(25);
// DELETE_TOPIC_IN_BROKER
adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200);
// DELETE_SUBSCRIPTIONGROUP
adminCode.add(207);
accessContralAnalysis.analysisClass(RequestCode.class);
brokerAccessControl = new BrokerAccessControl();
// 321
brokerAccessControl.setQueryConsumeQueue(false);
Set<String> permitSendTopic = new HashSet<>();
permitSendTopic.add("permitSendTopic");
brokerAccessControl.setPermitSendTopic(permitSendTopic);
Set<String> noPermitSendTopic = new HashSet<>();
noPermitSendTopic.add("noPermitSendTopic");
brokerAccessControl.setNoPermitSendTopic(noPermitSendTopic);
Set<String> permitPullTopic = new HashSet<>();
permitPullTopic.add("permitPullTopic");
brokerAccessControl.setPermitPullTopic(permitPullTopic);
Set<String> noPermitPullTopic = new HashSet<>();
noPermitPullTopic.add("noPermitPullTopic");
brokerAccessControl.setNoPermitPullTopic(noPermitPullTopic);
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
accessContralAnalysis.analysisClass(RequestCode.class);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(brokerAccessControl);
authenticationInfo = new AuthenticationInfo(map, brokerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
System.setProperty("rocketmq.home.dir", "src/test/resources");
plainAclPlugEngine = new PlainAclPlugEngine();
accessControl = new BrokerAccessControl();
accessControl.setAccount("rokcetmq");
accessControl.setPassword("aliyun11");
accessControl.setNetaddress("127.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
accessControlTwo = new BrokerAccessControl();
accessControlTwo.setAccount("rokcet1");
accessControlTwo.setPassword("aliyun1");
accessControlTwo.setNetaddress("127.0.0.1");
accessControlTwo.setRecognition("127.0.0.1:2");
}
@Test(expected = AclPlugRuntimeException.class)
public void accountNullTest() {
accessControl.setAccount(null);
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void accountThanTest() {
accessControl.setAccount("123");
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordtNullTest() {
accessControl.setAccount(null);
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordThanTest() {
accessControl.setAccount("123");
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
new PlainAclPlugEngine().initialize();
}
@Test
public void authenticationInfoOfSetAccessControl() {
plainAclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = plainAclPlugEngine.getAccessControl(accessControl);
AccessControl getAccessControl = authenticationInfo.getAccessControl();
Assert.assertEquals(accessControl, getAccessControl);
AccessControl testAccessControl = new AccessControl();
testAccessControl.setAccount("rokcetmq");
testAccessControl.setPassword("aliyun11");
testAccessControl.setNetaddress("127.0.0.1");
testAccessControl.setRecognition("127.0.0.1:1");
testAccessControl.setAccount("rokcetmq1");
authenticationInfo = plainAclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
testAccessControl.setAccount("rokcetmq");
testAccessControl.setPassword("1234567");
authenticationInfo = plainAclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
testAccessControl.setNetaddress("127.0.0.2");
authenticationInfo = plainAclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
}
@Test
public void setAccessControlList() {
List<AccessControl> accessControlList = new ArrayList<>();
accessControlList.add(accessControl);
accessControlList.add(accessControlTwo);
plainAclPlugEngine.setAccessControlList(accessControlList);
AuthenticationInfo newAccessControl = plainAclPlugEngine.getAccessControl(accessControl);
Assert.assertEquals(accessControl, newAccessControl.getAccessControl());
newAccessControl = plainAclPlugEngine.getAccessControl(accessControlTwo);
Assert.assertEquals(accessControlTwo, newAccessControl.getAccessControl());
}
@Test
public void setNetaddressAccessControl() {
AccessControl accessControl = new BrokerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("RocketMQ");
accessControl.setNetaddress("127.0.0.1");
plainAclPlugEngine.setAccessControl(accessControl);
plainAclPlugEngine.setNetaddressAccessControl(accessControl);
AuthenticationInfo authenticationInfo = plainAclPlugEngine.getAccessControl(accessControl);
AccessControl getAccessControl = authenticationInfo.getAccessControl();
Assert.assertEquals(accessControl, getAccessControl);
accessControl.setNetaddress("127.0.0.2");
authenticationInfo = plainAclPlugEngine.getAccessControl(accessControl);
Assert.assertNull(authenticationInfo);
}
public void eachCheckLoginAndAuthentication() {
}
@Test(expected = AclPlugRuntimeException.class)
public void BrokerAccessControlTransportTestNull() {
BrokerAccessControlTransport accessControlTransport = new BrokerAccessControlTransport();
plainAclPlugEngine.setBrokerAccessControlTransport(accessControlTransport);
}
@Test
public void BrokerAccessControlTransportTest() {
BrokerAccessControlTransport accessControlTransport = new BrokerAccessControlTransport();
List<BrokerAccessControl> list = new ArrayList<>();
list.add((BrokerAccessControl) this.accessControlTwo);
accessControlTransport.setOnlyNetAddress((BrokerAccessControl) this.accessControl);
accessControlTransport.setList(list);
plainAclPlugEngine.setBrokerAccessControlTransport(accessControlTransport);
AccessControl accessControl = new BrokerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("RocketMQ");
accessControl.setNetaddress("127.0.0.1");
plainAclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = plainAclPlugEngine.getAccessControl(accessControl);
Assert.assertNotNull(authenticationInfo.getAccessControl());
authenticationInfo = plainAclPlugEngine.getAccessControl(accessControlTwo);
Assert.assertEquals(accessControlTwo, authenticationInfo.getAccessControl());
}
@Test
public void authenticationTest() {
AuthenticationResult authenticationResult = new AuthenticationResult();
accessControl.setCode(317);
boolean isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(321);
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setCode(10);
accessControl.setTopic("permitSendTopic");
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(310);
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(320);
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setTopic("noPermitSendTopic");
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setTopic("nopermitSendTopic");
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setCode(11);
accessControl.setTopic("permitPullTopic");
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setTopic("noPermitPullTopic");
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setTopic("nopermitPullTopic");
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
}
@Test
public void isEmptyTest() {
AuthenticationResult authenticationResult = new AuthenticationResult();
accessControl.setCode(10);
accessControl.setTopic("absentTopic");
boolean isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
Set<String> permitSendTopic = new HashSet<>();
brokerAccessControl.setPermitSendTopic(permitSendTopic);
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(11);
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
brokerAccessControl.setPermitPullTopic(permitSendTopic);
isReturn = plainAclPlugEngine.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
}
@Test
public void adminBrokerAccessControlTest() {
BrokerAccessControl admin = new BrokerAccessControl();
admin.setAccount("adminTest");
admin.setPassword("adminTest");
admin.setNetaddress("127.0.0.1");
plainAclPlugEngine.setAccessControl(admin);
Assert.assertFalse(admin.isUpdateAndCreateTopic());
admin.setAdmin(true);
plainAclPlugEngine.setAccessControl(admin);
Assert.assertTrue(admin.isUpdateAndCreateTopic());
}
@Test
public void adminEachCheckAuthentication() {
BrokerAccessControl accessControl = new BrokerAccessControl();
accessControl.setAccount("RocketMQ1");
accessControl.setPassword("1234567");
accessControl.setNetaddress("127.0.0.1");
plainAclPlugEngine.setAccessControl(accessControl);
for (Integer code : adminCode) {
accessControl.setCode(code);
AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl);
Assert.assertFalse(authenticationResult.isSucceed());
}
plainAclPlugEngine.cleanAuthenticationInfo();
accessControl.setAdmin(true);
plainAclPlugEngine.setAccessControl(accessControl);
for (Integer code : adminCode) {
accessControl.setCode(code);
AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
}
}
@Test
public void cleanAuthenticationInfoTest() {
plainAclPlugEngine.setAccessControl(accessControl);
accessControl.setCode(202);
AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
plainAclPlugEngine.cleanAuthenticationInfo();
authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl);
Assert.assertFalse(authenticationResult.isSucceed());
}
@Test
public void isWatchStartTest() {
PlainAclPlugEngine plainAclPlugEngine = new PlainAclPlugEngine();
Assert.assertTrue(plainAclPlugEngine.isWatchStart());
System.setProperty("java.version", "1.6.11");
plainAclPlugEngine = new PlainAclPlugEngine();
Assert.assertFalse(plainAclPlugEngine.isWatchStart());
}
@Test
public void watchTest() throws IOException {
System.setProperty("rocketmq.home.dir", "src/test/resources/watch");
File file = new File("src/test/resources/watch/conf");
file.mkdirs();
File transport = new File("src/test/resources/watch/conf/transport.yml");
transport.createNewFile();
FileWriter writer = new FileWriter(transport);
writer.write("list:\r\n");
writer.write("- account: rokcetmq\r\n");
writer.write(" password: aliyun11\r\n");
writer.write(" netaddress: 127.0.0.1\r\n");
writer.flush();
writer.close();
PlainAclPlugEngine plainAclPlugEngine = new PlainAclPlugEngine();
accessControl.setCode(203);
AuthenticationResult authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
writer = new FileWriter(new File("src/test/resources/watch/conf/transport.yml"), true);
writer.write("- account: rokcet1\r\n");
writer.write(" password: aliyun1\r\n");
writer.write(" netaddress: 127.0.0.1\r\n");
writer.flush();
writer.close();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
accessControlTwo.setCode(203);
authenticationResult = plainAclPlugEngine.eachCheckAuthentication(accessControlTwo);
Assert.assertTrue(authenticationResult.isSucceed());
transport.delete();
file.delete();
file = new File("src/test/resources/watch");
file.delete();
}
@Test
public void analysisTest() {
BrokerAccessControl accessControl = new BrokerAccessControl();
accessControl.setSendMessage(false);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(accessControl);
Iterator<Entry<Integer, Boolean>> it = map.entrySet().iterator();
long num = 0;
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
if (!e.getValue()) {
if (adminCode.contains(e.getKey())) {
continue;
}
Assert.assertEquals(e.getKey(), Integer.valueOf(10));
num++;
}
}
Assert.assertEquals(num, 1);
}
@Test(expected = AclPlugRuntimeException.class)
public void analysisExceptionTest() {
AccessControl accessControl = new AccessControl();
accessContralAnalysis.analysis(accessControl);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.common.MixAll;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import org.yaml.snakeyaml.Yaml;
@RunWith(MockitoJUnitRunner.class)
public class PlainAclPlugEngineTest {
PlainAclPlugEngine plainAclPlugEngine;
BorkerAccessControlTransport transport;
AccessControl accessControl;
AccessControl accessControlTwo;
Map<String, LoginInfo> loginInfoMap;
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
System.setProperty("rocketmq.home.dir", "src/test/resources");
ControllerParameters controllerParametersEntity = new ControllerParameters();
Yaml ymal = new Yaml();
transport = ymal.loadAs(new FileInputStream(new File(controllerParametersEntity.getFileHome()+"/conf/transport.yml")), BorkerAccessControlTransport.class);
plainAclPlugEngine = new PlainAclPlugEngine(controllerParametersEntity);
plainAclPlugEngine.initialize();
accessControl = new BorkerAccessControl();
accessControl.setAccount("rokcetmq");
accessControl.setPassword("aliyun11");
accessControl.setNetaddress("127.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("rokcet1");
accessControlTwo.setPassword("aliyun1");
accessControlTwo.setNetaddress("127.0.0.1");
accessControlTwo.setRecognition("127.0.0.1:2");
loginInfoMap = new ConcurrentHashMap<>();
FieldSetter.setField(plainAclPlugEngine, plainAclPlugEngine.getClass().getSuperclass().getDeclaredField("loginInfoMap"), loginInfoMap);
}
@Test(expected = AclPlugRuntimeException.class)
public void accountNullTest() {
accessControl.setAccount(null);
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void accountThanTest() {
accessControl.setAccount("123");
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordtNullTest() {
accessControl.setAccount(null);
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordThanTest() {
accessControl.setAccount("123");
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
ControllerParameters controllerParametersEntity = new ControllerParameters();
controllerParametersEntity.setFileHome("");
new PlainAclPlugEngine(controllerParametersEntity).initialize();
}
@Test
public void authenticationInfoOfSetAccessControl() {
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
aclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
AccessControl getAccessControl = authenticationInfo.getAccessControl();
Assert.assertEquals(accessControl, getAccessControl);
AccessControl testAccessControl = new AccessControl();
testAccessControl.setAccount("rokcetmq");
testAccessControl.setPassword("aliyun11");
testAccessControl.setNetaddress("127.0.0.1");
testAccessControl.setRecognition("127.0.0.1:1");
testAccessControl.setAccount("rokcetmq1");
authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
testAccessControl.setAccount("rokcetmq");
testAccessControl.setPassword("1234567");
authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
testAccessControl.setNetaddress("127.0.0.2");
authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
}
@Test
public void setAccessControlList() {
List<AccessControl> accessControlList = new ArrayList<>();
accessControlList.add(accessControl);
accessControlList.add(accessControlTwo);
plainAclPlugEngine.setAccessControlList(accessControlList);
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
AuthenticationInfo newAccessControl = aclPlugEngine.getAccessControl(accessControl);
Assert.assertEquals(accessControl, newAccessControl.getAccessControl());
newAccessControl = aclPlugEngine.getAccessControl(accessControlTwo);
Assert.assertEquals(accessControlTwo, newAccessControl.getAccessControl());
}
@Test
public void setNetaddressAccessControl() {
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
AccessControl accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("RocketMQ");
accessControl.setNetaddress("127.0.0.1");
aclPlugEngine.setAccessControl(accessControl);
aclPlugEngine.setNetaddressAccessControl(accessControl);
AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
AccessControl getAccessControl = authenticationInfo.getAccessControl();
Assert.assertEquals(accessControl, getAccessControl);
accessControl.setNetaddress("127.0.0.2");
authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
Assert.assertNull(authenticationInfo);
}
public void eachCheckLoginAndAuthentication() {
}
@Test(expected = AclPlugRuntimeException.class)
public void borkerAccessControlTransportTestNull() {
plainAclPlugEngine.setBorkerAccessControlTransport(new BorkerAccessControlTransport());
}
@Test
public void borkerAccessControlTransportTest() {
BorkerAccessControlTransport borkerAccessControlTransprt = new BorkerAccessControlTransport();
borkerAccessControlTransprt.setOnlyNetAddress((BorkerAccessControl) this.accessControl);
List<BorkerAccessControl> list = new ArrayList<>();
list.add((BorkerAccessControl) this.accessControlTwo);
borkerAccessControlTransprt.setList(list);
plainAclPlugEngine.setBorkerAccessControlTransport(borkerAccessControlTransprt);
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
AccessControl accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("RocketMQ");
accessControl.setNetaddress("127.0.0.1");
aclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
Assert.assertNotNull(authenticationInfo.getAccessControl());
authenticationInfo = aclPlugEngine.getAccessControl(accessControlTwo);
Assert.assertEquals(accessControlTwo, authenticationInfo.getAccessControl());
}
@Test
public void getLoginInfo() {
plainAclPlugEngine.setAccessControl(accessControl);
LoginInfo loginInfo = plainAclPlugEngine.getLoginInfo(accessControl);
Assert.assertNotNull(loginInfo);
loginInfo = plainAclPlugEngine.getLoginInfo(accessControlTwo);
Assert.assertNull(loginInfo);
}
@Test
public void deleteLoginInfo() {
plainAclPlugEngine.setAccessControl(accessControl);
plainAclPlugEngine.getLoginInfo(accessControl);
LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
Assert.assertNotNull(loginInfo);
plainAclPlugEngine.deleteLoginInfo(accessControl.getRecognition());
loginInfo = loginInfoMap.get(accessControl.getRecognition());
Assert.assertNull(loginInfo);
}
@Test
public void getAuthenticationInfo() {
AccessControl newAccessControl = new AccessControl();
newAccessControl.setAccount("rokcetmq");
newAccessControl.setPassword("aliyun11");
newAccessControl.setNetaddress("127.0.0.1");
newAccessControl.setRecognition("127.0.0.1:1");
AuthenticationResult authenticationResult = new AuthenticationResult();
plainAclPlugEngine.getAuthenticationInfo(newAccessControl, authenticationResult);
Assert.assertEquals("Login information does not exist, Please check login, password, IP", authenticationResult.getResultString());
plainAclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = plainAclPlugEngine.getAuthenticationInfo(newAccessControl, authenticationResult);
Assert.assertNotNull(authenticationInfo);
}
}
......@@ -22,6 +22,7 @@ list:
- account: RocketMQ
password: 1234567
netaddress: 192.0.0.*
admin: true
permitSendTopic:
- test1
- test2
......
......@@ -1033,6 +1033,7 @@ public class BrokerController {
public void registerServerRPCHook(RPCHook rpcHook) {
getRemotingServer().registerRPCHook(rpcHook);
this.fastRemotingServer.registerRPCHook(rpcHook);
}
public RemotingServer getRemotingServer() {
......
......@@ -35,8 +35,8 @@ public class ServerUtil {
new Option("n", "namesrvAddr", true,
"Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
opt.setRequired(false);
options.addOption(opt);
options.addOption(opt);
return options;
}
......
......@@ -15,7 +15,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -60,5 +61,9 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
</dependencies>
</project>
......@@ -19,13 +19,17 @@ package org.apache.rocketmq.tools.command;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -75,6 +79,7 @@ import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand;
import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
......@@ -129,7 +134,7 @@ public class MQAdminStartup {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, rpcHook);
cmd.execute(commandLine, options, getAclRPCHook(commandLine));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
......@@ -157,7 +162,7 @@ public class MQAdminStartup {
initCommand(new QueryMsgByKeySubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new QueryMsgByOffsetSubCommand());
initCommand(new PrintMessageSubCommand());
initCommand(new PrintMessageByQueueCommand());
initCommand(new SendMsgStatusCommand());
......@@ -211,7 +216,6 @@ public class MQAdminStartup {
private static void printHelp() {
System.out.printf("The most commonly used mqadmin commands are:%n");
for (SubCommand cmd : subCommandList) {
System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc());
}
......@@ -243,4 +247,65 @@ public class MQAdminStartup {
public static void initCommand(SubCommand command) {
subCommandList.add(command);
}
public static RPCHook getAclRPCHook(CommandLine commandLine) {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
File file = new File(fileHome + "/conf/tools.yml");
if (!file.exists()) {
System.out.printf("file %s is not exist \n", file.getPath());
return null;
}
Yaml ymal = new Yaml();
FileInputStream fis = null;
Map<String, Map<String, Object>> map = null;
try {
fis = new FileInputStream(file);
map = ymal.loadAs(fis, Map.class);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
if (map == null || map.isEmpty()) {
System.out.printf("file %s is no data", file.getPath());
return null;
}
final Map<String, Map<String, Object>> newMap = map;
return new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
System.out.printf("remoteAddr is %s code %d \n", remoteAddr, request.getCode());
String fastRemoteAddr = null;
if (remoteAddr != null) {
String[] ipAndPost = StringUtils.split(remoteAddr, ":");
Integer fastPost = Integer.valueOf(ipAndPost[1]) + 2;
fastRemoteAddr = ipAndPost[0] + ":" + fastPost.toString();
}
Map<String, Object> map;
if ((map = newMap.get(remoteAddr)) != null || (map = newMap.get(fastRemoteAddr)) != null || (map = newMap.get("all")) != null) {
HashMap<String, String> ext = request.getExtFields();
if (ext == null) {
ext = new HashMap<>();
request.setExtFields(ext);
}
ext.put("account", map.get("account").toString());
ext.put("password", map.get("password").toString());
}
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
};
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册