未验证 提交 76ab7bdb 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge pull request #485 from githublaohu/develop-acl

Merge to branch feature_acl, and make it easy for others to polish
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl-plug</artifactId>
<name>rocketmq-acl-plug ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-logging</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AccessContralAnalysis {
private Map<Class<?>, Map<Integer, Field>> classTocodeAndMentod = new HashMap<>();
private Map<String, Integer> fieldNameAndCode = new HashMap<>();
public void analysisClass(Class<?> clazz) {
Field[] fields = clazz.getDeclaredFields();
try {
for (Field field : fields) {
if (field.getType().equals(int.class)) {
String name = StringUtils.replace(field.getName(), "_", "").toLowerCase();
fieldNameAndCode.put(name, (Integer) field.get(null));
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(String.format("analysis on failure Class is %s", clazz.getName()), e);
}
}
public Map<Integer, Boolean> analysis(AccessControl accessControl) {
Class<? extends AccessControl> clazz = accessControl.getClass();
Map<Integer, Field> codeAndField = classTocodeAndMentod.get(clazz);
if (codeAndField == null) {
codeAndField = new HashMap<>();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (!field.getType().equals(boolean.class))
continue;
Integer code = fieldNameAndCode.get(field.getName().toLowerCase());
if (code == null) {
throw new AclPlugRuntimeException(String.format("field nonexistent in code fieldName is %s", field.getName()));
}
field.setAccessible(true);
codeAndField.put(code, field);
}
if (codeAndField.isEmpty()) {
throw new AclPlugRuntimeException(String.format("AccessControl nonexistent code , name %s", accessControl.getClass().getName()));
}
classTocodeAndMentod.put(clazz, codeAndField);
}
Iterator<Entry<Integer, Field>> it = codeAndField.entrySet().iterator();
Map<Integer, Boolean> authority = new HashMap<>();
try {
while (it.hasNext()) {
Entry<Integer, Field> e = it.next();
authority.put(e.getKey(), (Boolean) e.getValue().get(accessControl));
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(String.format("analysis on failure AccessControl is %s", AccessControl.class.getName()), e);
}
return authority;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AclPlugController {
private ControllerParameters controllerParameters;
private AclPlugEngine aclPlugEngine;
private AclRemotingService aclRemotingService;
private boolean startSucceed = false;
public AclPlugController(ControllerParameters controllerParameters) throws AclPlugRuntimeException {
try {
this.controllerParameters = controllerParameters;
aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
aclPlugEngine.initialize();
aclRemotingService = new DefaultAclRemotingServiceImpl(aclPlugEngine);
this.startSucceed = true;
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("Start the abnormal , Launch parameters is %s", this.controllerParameters.toString()), e);
}
}
public AclRemotingService getAclRemotingService() {
return this.aclRemotingService;
}
public void doChannelCloseEvent(String remoteAddr) {
if (this.startSucceed) {
aclPlugEngine.deleteLoginInfo(remoteAddr);
}
}
public boolean isStartSucceed() {
return startSucceed;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
public interface AclRemotingService {
public AuthenticationResult check(AccessControl accessControl);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AclUtils {
public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
throw new AclPlugRuntimeException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
}
}
public static String[] getAddreeStrArray(String netaddress, String four) {
String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
String address = netaddress.substring(0, netaddress.indexOf("{"));
String[] addreeStrArray = new String[fourStrArray.length];
for (int i = 0; i < fourStrArray.length; i++) {
addreeStrArray[i] = address + fourStrArray[i];
}
return addreeStrArray;
}
public static boolean isScope(String num, int index) {
String[] strArray = StringUtils.split(num, ".");
if (strArray.length != 4) {
return false;
}
return isScope(strArray, index);
}
public static boolean isScope(String[] num, int index) {
if (num.length <= index) {
}
for (int i = 0; i < index; i++) {
if (!isScope(num[i])) {
return false;
}
}
return true;
}
public static boolean isScope(String num) {
return isScope(Integer.valueOf(num.trim()));
}
public static boolean isScope(int num) {
return num >= 0 && num <= 255;
}
public static boolean isAsterisk(String asterisk) {
return asterisk.indexOf('*') > -1;
}
public static boolean isColon(String colon) {
return colon.indexOf(',') > -1;
}
public static boolean isMinus(String minus) {
return minus.indexOf('-') > -1;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
public class Authentication {
public boolean authentication(AuthenticationInfo authenticationInfo,
AccessControl accessControl, AuthenticationResult authenticationResult) {
int code = accessControl.getCode();
if (!authenticationInfo.getAuthority().get(code)) {
authenticationResult.setResultString(String.format("code is %d Authentication failed", code));
return false;
}
if (!(authenticationInfo.getAccessControl() instanceof BorkerAccessControl)) {
return true;
}
BorkerAccessControl borker = (BorkerAccessControl) authenticationInfo.getAccessControl();
String topicName = accessControl.getTopic();
if (code == 10 || code == 310 || code == 320) {
if (borker.getPermitSendTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitSendTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName));
return false;
}
return borker.getPermitSendTopic().isEmpty() ? true : false;
} else if (code == 11) {
if (borker.getPermitPullTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitPullTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName));
return false;
}
return borker.getPermitPullTopic().isEmpty() ? true : false;
}
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class DefaultAclRemotingServiceImpl implements AclRemotingService {
private AclPlugEngine aclPlugEngine;
public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) {
this.aclPlugEngine = aclPlugEngine;
}
@Override
public AuthenticationResult check(AccessControl accessControl) {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckLoginAndAuthentication(accessControl);
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessControl.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessControl.toString()));
}
return authenticationResult;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
public interface AclPlugEngine {
public AuthenticationInfo getAccessControl(AccessControl accessControl);
public LoginInfo getLoginInfo(AccessControl accessControl);
public void deleteLoginInfo(String remoteAddr);
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl);
public void initialize();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.Authentication;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPlugEngine {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
ControllerParameters controllerParameters;
private Map<String/** account **/, Map<String/** netaddress **/, AuthenticationInfo>> accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
private Authentication authentication = new Authentication();
public AuthenticationInfoManagementAclPlugEngine(ControllerParameters controllerParameters) {
this.controllerParameters = controllerParameters;
accessContralAnalysis.analysisClass(controllerParameters.getAccessContralAnalysisClass());
}
public void setAccessControl(AccessControl accessControl) throws AclPlugRuntimeException {
if (accessControl.getAccount() == null || accessControl.getPassword() == null || accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) {
throw new AclPlugRuntimeException(String.format("The account password cannot be null and is longer than 6, account is %s password is %s", accessControl.getAccount(), accessControl.getPassword()));
}
try {
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressMap == null) {
accessControlAddressMap = new HashMap<>();
accessControlMap.put(accessControl.getAccount(), accessControlAddressMap);
}
AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo);
log.info("authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e);
}
}
public void setAccessControlList(List<AccessControl> accessControlList) throws AclPlugRuntimeException {
for (AccessControl accessControl : accessControlList) {
setAccessControl(accessControl);
}
}
public void setNetaddressAccessControl(AccessControl accessControl) throws AclPlugRuntimeException {
try {
authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategyFactory.getNetaddressStrategy(accessControl));
log.info("default authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) {
throw new AclPlugRuntimeException(accessControl.toString(), e);
}
}
public AuthenticationInfo getAccessControl(AccessControl accessControl) {
AuthenticationInfo existing = null;
if (accessControl.getAccount() == null && authenticationInfo != null) {
existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
} else {
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressMap != null) {
existing = accessControlAddressMap.get(accessControl.getNetaddress());
if (existing == null)
return null;
if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) {
if (existing.getNetaddressStrategy().match(accessControl)) {
return existing;
}
}
existing = null;
}
}
return existing;
}
@Override
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl) {
AuthenticationResult authenticationResult = new AuthenticationResult();
try {
AuthenticationInfo authenticationInfo = getAuthenticationInfo(accessControl, authenticationResult);
if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
}
} catch (Exception e) {
authenticationResult.setException(e);
}
return authenticationResult;
}
void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty");
}
if (transport.getOnlyNetAddress() != null) {
this.setNetaddressAccessControl(transport.getOnlyNetAddress());
}
if (transport.getList() != null || transport.getList().size() > 0) {
for (AccessControl accessControl : transport.getList()) {
this.setAccessControl(accessControl);
}
}
}
protected abstract AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
AuthenticationResult authenticationResult);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagementAclPlugEngine {
private Map<String, LoginInfo> loginInfoMap = new ConcurrentHashMap<>();
public LoginInfoAclPlugEngine(ControllerParameters controllerParameters) {
super(controllerParameters);
}
public LoginInfo getLoginInfo(AccessControl accessControl) {
LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
if (loginInfo == null) {
AuthenticationInfo authenticationInfo = super.getAccessControl(accessControl);
if (authenticationInfo != null) {
loginInfo = new LoginInfo();
loginInfo.setAuthenticationInfo(authenticationInfo);
loginInfoMap.put(accessControl.getRecognition(), loginInfo);
}
}
if (loginInfo != null) {
loginInfo.setOperationTime(System.currentTimeMillis());
}
return loginInfo;
}
public void deleteLoginInfo(String remoteAddr) {
loginInfoMap.remove(remoteAddr);
}
protected AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
AuthenticationResult authenticationResult) {
LoginInfo loginInfo = getLoginInfo(accessControl);
if (loginInfo != null && loginInfo.getAuthenticationInfo() != null) {
return loginInfo.getAuthenticationInfo();
}
authenticationResult.setResultString("Login information does not exist, Please check login, password, IP");
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.yaml.snakeyaml.Yaml;
public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
public PlainAclPlugEngine(
ControllerParameters controllerParameters) throws AclPlugRuntimeException {
super(controllerParameters);
}
public void initialize() throws AclPlugRuntimeException {
String filePath = controllerParameters.getFileHome() + "/conf/transport.yml";
Yaml ymal = new Yaml();
FileInputStream fis = null;
BorkerAccessControlTransport transport;
try {
fis = new FileInputStream(new File(filePath));
transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("The transport.yml file for Plain mode was not found , paths %s", filePath), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclPlugRuntimeException("close transport fileInputStream Exception", e);
}
}
}
if (transport == null) {
throw new AclPlugRuntimeException("transport.yml file is no data");
}
super.setBorkerAccessControlTransport(transport);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
public class AccessControl {
private String account;
private String password;
private String netaddress;
private String recognition;
private int code;
private String topic;
public AccessControl() {
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getNetaddress() {
return netaddress;
}
public void setNetaddress(String netaddress) {
this.netaddress = netaddress;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AccessControl [account=").append(account).append(", password=").append(password)
.append(", netaddress=").append(netaddress).append(", recognition=").append(recognition)
.append(", code=").append(code).append(", topic=").append(topic).append("]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
public class AuthenticationInfo {
private AccessControl accessControl;
private NetaddressStrategy netaddressStrategy;
private Map<Integer, Boolean> authority;
public AuthenticationInfo(Map<Integer, Boolean> authority, AccessControl accessControl,
NetaddressStrategy netaddressStrategy) {
super();
this.authority = authority;
this.accessControl = accessControl;
this.netaddressStrategy = netaddressStrategy;
}
public AccessControl getAccessControl() {
return accessControl;
}
public void setAccessControl(AccessControl accessControl) {
this.accessControl = accessControl;
}
public NetaddressStrategy getNetaddressStrategy() {
return netaddressStrategy;
}
public void setNetaddressStrategy(NetaddressStrategy netaddressStrategy) {
this.netaddressStrategy = netaddressStrategy;
}
public Map<Integer, Boolean> getAuthority() {
return authority;
}
public void setAuthority(Map<Integer, Boolean> authority) {
this.authority = authority;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticationInfo [accessControl=").append(accessControl).append(", netaddressStrategy=")
.append(netaddressStrategy).append(", authority={");
Iterator<Entry<Integer, Boolean>> it = authority.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
if (!e.getValue()) {
builder.append(e.getKey().toString()).append(":").append(e.getValue()).append(",");
}
}
builder.append("}]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
public class AuthenticationResult {
private AccessControl accessControl;
private boolean succeed;
private Exception exception;
private String resultString;
public AccessControl getAccessControl() {
return accessControl;
}
public void setAccessControl(AccessControl accessControl) {
this.accessControl = accessControl;
}
public boolean isSucceed() {
return succeed;
}
public void setSucceed(boolean succeed) {
this.succeed = succeed;
}
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import java.util.List;
public class BorkerAccessControlTransport {
private BorkerAccessControl onlyNetAddress;
private List<BorkerAccessControl> list;
public BorkerAccessControlTransport() {
super();
}
public BorkerAccessControl getOnlyNetAddress() {
return onlyNetAddress;
}
public void setOnlyNetAddress(BorkerAccessControl onlyNetAddress) {
this.onlyNetAddress = onlyNetAddress;
}
public List<BorkerAccessControl> getList() {
return list;
}
public void setList(List<BorkerAccessControl> list) {
this.list = list;
}
@Override
public String toString() {
return "BorkerAccessControlTransport [onlyNetAddress=" + onlyNetAddress + ", list=" + list + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import org.apache.rocketmq.common.protocol.RequestCode;
public class ControllerParameters {
private String fileHome;
private Class<?> accessContralAnalysisClass = RequestCode.class;
public String getFileHome() {
return fileHome;
}
public void setFileHome(String fileHome) {
this.fileHome = fileHome;
}
public Class<?> getAccessContralAnalysisClass() {
return accessContralAnalysisClass;
}
public void setAccessContralAnalysisClass(Class<?> accessContralAnalysisClass) {
this.accessContralAnalysisClass = accessContralAnalysisClass;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ControllerParametersEntity [fileHome=").append(fileHome).append(", accessContralAnalysisClass=")
.append(accessContralAnalysisClass).append("]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.entity;
import java.util.concurrent.atomic.AtomicBoolean;
public class LoginInfo {
private String recognition;
private long loginTime = System.currentTimeMillis();
private volatile long operationTime = loginTime;
private volatile AtomicBoolean clear = new AtomicBoolean();
private AuthenticationInfo authenticationInfo;
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
}
public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
this.authenticationInfo = authenticationInfo;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
public long getLoginTime() {
return loginTime;
}
public void setLoginTime(long loginTime) {
this.loginTime = loginTime;
}
public long getOperationTime() {
return operationTime;
}
public void setOperationTime(long operationTime) {
this.operationTime = operationTime;
}
public AtomicBoolean getClear() {
return clear;
}
public void setClear(AtomicBoolean clear) {
this.clear = clear;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("LoginInfo [recognition=").append(recognition).append(", loginTime=").append(loginTime)
.append(", operationTime=").append(operationTime).append(", clear=").append(clear)
.append(", authenticationInfo=").append(authenticationInfo).append("]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.exception;
public class AclPlugRuntimeException extends RuntimeException {
private static final long serialVersionUID = 6062101368637228900L;
public AclPlugRuntimeException(String message) {
super(message);
}
public AclPlugRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public interface NetaddressStrategy {
public boolean match(AccessControl accessControl);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.strategy;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class NetaddressStrategyFactory {
public static final NullNetaddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullNetaddressStrategy();
public NetaddressStrategy getNetaddressStrategy(AccessControl accessControl) {
String netaddress = accessControl.getNetaddress();
if (StringUtils.isBlank(netaddress) || "*".equals(netaddress)) {
return NULL_NET_ADDRESS_STRATEGY;
}
if (netaddress.endsWith("}")) {
String[] strArray = StringUtils.split(netaddress, ".");
String four = strArray[3];
if (!four.startsWith("{")) {
throw new AclPlugRuntimeException(String.format("MultipleNetaddressStrategy netaddress examine scope Exception netaddress", netaddress));
}
return new MultipleNetaddressStrategy(AclUtils.getAddreeStrArray(netaddress, four));
} else if (AclUtils.isColon(netaddress)) {
return new MultipleNetaddressStrategy(StringUtils.split(netaddress, ","));
} else if (AclUtils.isAsterisk(netaddress) || AclUtils.isMinus(netaddress)) {
return new RangeNetaddressStrategy(netaddress);
}
return new OneNetaddressStrategy(netaddress);
}
public static class NullNetaddressStrategy implements NetaddressStrategy {
@Override
public boolean match(AccessControl accessControl) {
return true;
}
}
public static class MultipleNetaddressStrategy implements NetaddressStrategy {
private final Set<String> multipleSet = new HashSet<>();
public MultipleNetaddressStrategy(String[] strArray) {
for (String netaddress : strArray) {
AclUtils.verify(netaddress, 4);
multipleSet.add(netaddress);
}
}
@Override
public boolean match(AccessControl accessControl) {
return multipleSet.contains(accessControl.getNetaddress());
}
}
public static class OneNetaddressStrategy implements NetaddressStrategy {
private String netaddress;
public OneNetaddressStrategy(String netaddress) {
this.netaddress = netaddress;
AclUtils.verify(netaddress, 4);
}
@Override
public boolean match(AccessControl accessControl) {
return netaddress.equals(accessControl.getNetaddress());
}
}
public static class RangeNetaddressStrategy implements NetaddressStrategy {
private String head;
private int start;
private int end;
private int index;
public RangeNetaddressStrategy(String netaddress) {
String[] strArray = StringUtils.split(netaddress, ".");
if (analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(netaddress, index - 1);
StringBuffer sb = new StringBuffer().append(strArray[0].trim()).append(".").append(strArray[1].trim()).append(".");
if (index == 3) {
sb.append(strArray[2].trim()).append(".");
}
this.head = sb.toString();
}
}
private boolean analysis(String[] strArray, int index) {
String value = strArray[index].trim();
this.index = index;
if ("*".equals(value)) {
setValue(0, 255);
} else if (AclUtils.isMinus(value)) {
if (value.indexOf("-") == 0) {
throw new AclPlugRuntimeException(String.format("RangeNetaddressStrategy netaddress examine scope Exception value %s ", value));
}
String[] valueArray = StringUtils.split(value, "-");
this.start = Integer.valueOf(valueArray[0]);
this.end = Integer.valueOf(valueArray[1]);
if (!(AclUtils.isScope(end) && AclUtils.isScope(start) && start <= end)) {
throw new AclPlugRuntimeException(String.format("RangeNetaddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
}
}
return this.end > 0 ? true : false;
}
private void setValue(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public boolean match(AccessControl accessControl) {
String netAddress = accessControl.getNetaddress();
if (netAddress.startsWith(this.head)) {
String value;
if (index == 3) {
value = netAddress.substring(this.head.length());
} else {
value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
}
Integer address = Integer.valueOf(value);
if (address >= this.start && address <= this.end) {
return true;
}
}
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AccessContralAnalysisTest {
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
@Before
public void init() {
accessContralAnalysis.analysisClass(RequestCode.class);
}
@Test
public void analysisTest() {
BorkerAccessControl accessControl = new BorkerAccessControl();
accessControl.setSendMessage(false);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(accessControl);
Iterator<Entry<Integer, Boolean>> it = map.entrySet().iterator();
long num = 0;
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
if (!e.getValue()) {
Assert.assertEquals(e.getKey(), Integer.valueOf(10));
num++;
}
}
Assert.assertEquals(num, 1);
}
@Test(expected = AclPlugRuntimeException.class)
public void analysisExceptionTest() {
AccessControl accessControl = new AccessControl();
accessContralAnalysis.analysis(accessControl);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
public class AclPlugControllerTest {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class AclUtilsTest {
@Test
public void getAddreeStrArray() {
String address = "1.1.1.{1,2,3,4}";
String[] addressArray = AclUtils.getAddreeStrArray(address, "{1,2,3,4}");
List<String> newAddressList = new ArrayList<>();
for (String a : addressArray) {
newAddressList.add(a);
}
List<String> addressList = new ArrayList<>();
addressList.add("1.1.1.1");
addressList.add("1.1.1.2");
addressList.add("1.1.1.3");
addressList.add("1.1.1.4");
Assert.assertEquals(newAddressList, addressList);
}
@Test
public void isScopeStringArray() {
String adderss = "12";
for (int i = 0; i < 6; i++) {
boolean isScope = AclUtils.isScope(adderss, 4);
if (i == 3) {
Assert.assertTrue(isScope);
} else {
Assert.assertFalse(isScope);
}
adderss = adderss + ".12";
}
}
@Test
public void isScopeArray() {
String[] adderss = StringUtils.split("12.12.12.12", ".");
boolean isScope = AclUtils.isScope(adderss, 4);
Assert.assertTrue(isScope);
isScope = AclUtils.isScope(adderss, 3);
Assert.assertTrue(isScope);
adderss = StringUtils.split("12.12.1222.1222", ".");
isScope = AclUtils.isScope(adderss, 4);
Assert.assertFalse(isScope);
isScope = AclUtils.isScope(adderss, 3);
Assert.assertFalse(isScope);
}
@Test
public void isScopeStringTest() {
for (int i = 0; i < 256; i++) {
boolean isScope = AclUtils.isScope(i + "");
Assert.assertTrue(isScope);
}
boolean isScope = AclUtils.isScope("-1");
Assert.assertFalse(isScope);
isScope = AclUtils.isScope("256");
Assert.assertFalse(isScope);
}
@Test
public void isScopeTest() {
for (int i = 0; i < 256; i++) {
boolean isScope = AclUtils.isScope(i);
Assert.assertTrue(isScope);
}
boolean isScope = AclUtils.isScope(-1);
Assert.assertFalse(isScope);
isScope = AclUtils.isScope(256);
Assert.assertFalse(isScope);
}
@Test
public void isAsteriskTest() {
boolean isAsterisk = AclUtils.isAsterisk("*");
Assert.assertTrue(isAsterisk);
isAsterisk = AclUtils.isAsterisk(",");
Assert.assertFalse(isAsterisk);
}
@Test
public void isColonTest() {
boolean isColon = AclUtils.isColon(",");
Assert.assertTrue(isColon);
isColon = AclUtils.isColon("-");
Assert.assertFalse(isColon);
}
@Test
public void isMinusTest() {
boolean isMinus = AclUtils.isMinus("-");
Assert.assertTrue(isMinus);
isMinus = AclUtils.isMinus("*");
Assert.assertFalse(isMinus);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AuthenticationTest {
Authentication authentication = new Authentication();
AuthenticationInfo authenticationInfo;
BorkerAccessControl borkerAccessControl;
AuthenticationResult authenticationResult = new AuthenticationResult();
AccessControl accessControl = new AccessControl();
@Before
public void init() {
borkerAccessControl = new BorkerAccessControl();
//321
borkerAccessControl.setQueryConsumeQueue(false);
Set<String> permitSendTopic = new HashSet<>();
permitSendTopic.add("permitSendTopic");
borkerAccessControl.setPermitSendTopic(permitSendTopic);
Set<String> noPermitSendTopic = new HashSet<>();
noPermitSendTopic.add("noPermitSendTopic");
borkerAccessControl.setNoPermitSendTopic(noPermitSendTopic);
Set<String> permitPullTopic = new HashSet<>();
permitPullTopic.add("permitPullTopic");
borkerAccessControl.setPermitPullTopic(permitPullTopic);
Set<String> noPermitPullTopic = new HashSet<>();
noPermitPullTopic.add("noPermitPullTopic");
borkerAccessControl.setNoPermitPullTopic(noPermitPullTopic);
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
accessContralAnalysis.analysisClass(RequestCode.class);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(borkerAccessControl);
authenticationInfo = new AuthenticationInfo(map, borkerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
}
@Test
public void authenticationTest() {
accessControl.setCode(317);
boolean isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(321);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setCode(10);
accessControl.setTopic("permitSendTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(310);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(320);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setTopic("noPermitSendTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setTopic("nopermitSendTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setCode(11);
accessControl.setTopic("permitPullTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setTopic("noPermitPullTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
accessControl.setTopic("nopermitPullTopic");
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
}
@Test
public void isEmptyTest() {
accessControl.setCode(10);
accessControl.setTopic("absentTopic");
boolean isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
Set<String> permitSendTopic = new HashSet<>();
borkerAccessControl.setPermitSendTopic(permitSendTopic);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
accessControl.setCode(11);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertFalse(isReturn);
borkerAccessControl.setPermitPullTopic(permitSendTopic);
isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
Assert.assertTrue(isReturn);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.engine;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.common.MixAll;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import org.yaml.snakeyaml.Yaml;
@RunWith(MockitoJUnitRunner.class)
public class PlainAclPlugEngineTest {
PlainAclPlugEngine plainAclPlugEngine;
BorkerAccessControlTransport transport;
AccessControl accessControl;
AccessControl accessControlTwo;
Map<String, LoginInfo> loginInfoMap;
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
Yaml ymal = new Yaml();
String home = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
InputStream fis = null;
if (home == null) {
URL url = PlainAclPlugEngineTest.class.getResource("/");
home = url.toString();
home = home.substring(0, home.length() - 1).replace("file:/", "").replace("target/test-classes", "");
home = home + "src/test/resources";
if (!new File(home + "/conf/transport.yml").exists()) {
home = "/home/travis/build/githublaohu/rocketmq/acl-plug/src/test/resources";
}
}
String filePath = home + "/conf/transport.yml";
try {
fis = new FileInputStream(new File(filePath));
transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
}catch(Exception e) {
AccessControl accessControl = new BorkerAccessControl();
accessControl.setAccount("onlyNetAddress");
accessControl.setPassword("aliyun11");
accessControl.setNetaddress("127.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
AccessControl accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("listTransport");
accessControlTwo.setPassword("aliyun1");
accessControlTwo.setNetaddress("127.0.0.1");
accessControlTwo.setRecognition("127.0.0.1:2");
transport = new BorkerAccessControlTransport();
transport.setOnlyNetAddress((BorkerAccessControl)accessControl);
}
ControllerParameters controllerParametersEntity = new ControllerParameters();
controllerParametersEntity.setFileHome(null);
try {
plainAclPlugEngine = new PlainAclPlugEngine(controllerParametersEntity);
plainAclPlugEngine.initialize();
} catch (Exception e) {
}
accessControl = new BorkerAccessControl();
accessControl.setAccount("rokcetmq");
accessControl.setPassword("aliyun11");
accessControl.setNetaddress("127.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("rokcet1");
accessControlTwo.setPassword("aliyun1");
accessControlTwo.setNetaddress("127.0.0.1");
accessControlTwo.setRecognition("127.0.0.1:2");
loginInfoMap = new ConcurrentHashMap<>();
FieldSetter.setField(plainAclPlugEngine, plainAclPlugEngine.getClass().getSuperclass().getDeclaredField("loginInfoMap"), loginInfoMap);
}
@Test(expected = AclPlugRuntimeException.class)
public void accountNullTest() {
accessControl.setAccount(null);
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void accountThanTest() {
accessControl.setAccount("123");
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordtNullTest() {
accessControl.setAccount(null);
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordThanTest() {
accessControl.setAccount("123");
plainAclPlugEngine.setAccessControl(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
ControllerParameters controllerParametersEntity = new ControllerParameters();
new PlainAclPlugEngine(controllerParametersEntity).initialize();
}
@Test
public void authenticationInfoOfSetAccessControl() {
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
aclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
AccessControl getAccessControl = authenticationInfo.getAccessControl();
Assert.assertEquals(accessControl, getAccessControl);
AccessControl testAccessControl = new AccessControl();
testAccessControl.setAccount("rokcetmq");
testAccessControl.setPassword("aliyun11");
testAccessControl.setNetaddress("127.0.0.1");
testAccessControl.setRecognition("127.0.0.1:1");
testAccessControl.setAccount("rokcetmq1");
authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
testAccessControl.setAccount("rokcetmq");
testAccessControl.setPassword("1234567");
authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
testAccessControl.setNetaddress("127.0.0.2");
authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
Assert.assertNull(authenticationInfo);
}
@Test
public void setAccessControlList() {
List<AccessControl> accessControlList = new ArrayList<>();
accessControlList.add(accessControl);
accessControlList.add(accessControlTwo);
plainAclPlugEngine.setAccessControlList(accessControlList);
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
AuthenticationInfo newAccessControl = aclPlugEngine.getAccessControl(accessControl);
Assert.assertEquals(accessControl, newAccessControl.getAccessControl());
newAccessControl = aclPlugEngine.getAccessControl(accessControlTwo);
Assert.assertEquals(accessControlTwo, newAccessControl.getAccessControl());
}
@Test
public void setNetaddressAccessControl() {
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
AccessControl accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("RocketMQ");
accessControl.setNetaddress("127.0.0.1");
aclPlugEngine.setAccessControl(accessControl);
aclPlugEngine.setNetaddressAccessControl(accessControl);
AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
AccessControl getAccessControl = authenticationInfo.getAccessControl();
Assert.assertEquals(accessControl, getAccessControl);
accessControl.setNetaddress("127.0.0.2");
authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
Assert.assertNull(authenticationInfo);
}
public void eachCheckLoginAndAuthentication() {
}
@Test(expected = AclPlugRuntimeException.class)
public void borkerAccessControlTransportTestNull() {
plainAclPlugEngine.setBorkerAccessControlTransport(new BorkerAccessControlTransport());
}
@Test
public void borkerAccessControlTransportTest() {
BorkerAccessControlTransport borkerAccessControlTransprt = new BorkerAccessControlTransport();
borkerAccessControlTransprt.setOnlyNetAddress((BorkerAccessControl) this.accessControl);
List<BorkerAccessControl> list = new ArrayList<>();
list.add((BorkerAccessControl) this.accessControlTwo);
borkerAccessControlTransprt.setList(list);
plainAclPlugEngine.setBorkerAccessControlTransport(borkerAccessControlTransprt);
AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
AccessControl accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("RocketMQ");
accessControl.setNetaddress("127.0.0.1");
aclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
Assert.assertNotNull(authenticationInfo.getAccessControl());
authenticationInfo = aclPlugEngine.getAccessControl(accessControlTwo);
Assert.assertEquals(accessControlTwo, authenticationInfo.getAccessControl());
}
@Test
public void getLoginInfo() {
plainAclPlugEngine.setAccessControl(accessControl);
LoginInfo loginInfo = plainAclPlugEngine.getLoginInfo(accessControl);
Assert.assertNotNull(loginInfo);
loginInfo = plainAclPlugEngine.getLoginInfo(accessControlTwo);
Assert.assertNull(loginInfo);
}
@Test
public void deleteLoginInfo() {
plainAclPlugEngine.setAccessControl(accessControl);
plainAclPlugEngine.getLoginInfo(accessControl);
LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
Assert.assertNotNull(loginInfo);
plainAclPlugEngine.deleteLoginInfo(accessControl.getRecognition());
loginInfo = loginInfoMap.get(accessControl.getRecognition());
Assert.assertNull(loginInfo);
}
@Test
public void getAuthenticationInfo() {
AccessControl newAccessControl = new AccessControl();
newAccessControl.setAccount("rokcetmq");
newAccessControl.setPassword("aliyun11");
newAccessControl.setNetaddress("127.0.0.1");
newAccessControl.setRecognition("127.0.0.1:1");
AuthenticationResult authenticationResult = new AuthenticationResult();
plainAclPlugEngine.getAuthenticationInfo(newAccessControl, authenticationResult);
Assert.assertEquals("Login information does not exist, Please check login, password, IP", authenticationResult.getResultString());
plainAclPlugEngine.setAccessControl(accessControl);
AuthenticationInfo authenticationInfo = plainAclPlugEngine.getAuthenticationInfo(newAccessControl, authenticationResult);
Assert.assertNotNull(authenticationInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.junit.Assert;
import org.junit.Test;
public class NetaddressStrategyTest {
NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
@Test
public void NetaddressStrategyFactoryTest() {
AccessControl accessControl = new AccessControl();
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
accessControl.setNetaddress("*");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
accessControl.setNetaddress("127.0.0.1");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.OneNetaddressStrategy.class);
accessControl.setNetaddress("127.0.0.1,127.0.0.2,127.0.0.3");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.MultipleNetaddressStrategy.class);
accessControl.setNetaddress("127.0.0.{1,2,3}");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.MultipleNetaddressStrategy.class);
accessControl.setNetaddress("127.0.0.1-200");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.RangeNetaddressStrategy.class);
accessControl.setNetaddress("127.0.0.*");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.RangeNetaddressStrategy.class);
accessControl.setNetaddress("127.0.1-20.*");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.RangeNetaddressStrategy.class);
}
@Test(expected = AclPlugRuntimeException.class)
public void verifyTest() {
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress("127.0.0.1");
netaddressStrategyFactory.getNetaddressStrategy(accessControl);
accessControl.setNetaddress("256.0.0.1");
netaddressStrategyFactory.getNetaddressStrategy(accessControl);
}
@Test
public void nullNetaddressStrategyTest() {
boolean isMatch = NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY.match(new AccessControl());
Assert.assertTrue(isMatch);
}
public void oneNetaddressStrategyTest() {
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress("127.0.0.1");
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
accessControl.setNetaddress("");
boolean match = netaddressStrategy.match(accessControl);
Assert.assertFalse(match);
accessControl.setNetaddress("127.0.0.2");
match = netaddressStrategy.match(accessControl);
Assert.assertFalse(match);
accessControl.setNetaddress("127.0.0.1");
match = netaddressStrategy.match(accessControl);
Assert.assertTrue(match);
}
@Test
public void multipleNetaddressStrategyTest() {
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress("127.0.0.1,127.0.0.2,127.0.0.3");
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
multipleNetaddressStrategyTest(netaddressStrategy);
accessControl.setNetaddress("127.0.0.{1,2,3}");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
multipleNetaddressStrategyTest(netaddressStrategy);
}
@Test(expected = AclPlugRuntimeException.class)
public void multipleNetaddressStrategyExceptionTest() {
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress("127.0.0.1,2,3}");
netaddressStrategyFactory.getNetaddressStrategy(accessControl);
}
private void multipleNetaddressStrategyTest(NetaddressStrategy netaddressStrategy) {
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress("127.0.0.1");
boolean match = netaddressStrategy.match(accessControl);
Assert.assertTrue(match);
accessControl.setNetaddress("127.0.0.2");
match = netaddressStrategy.match(accessControl);
Assert.assertTrue(match);
accessControl.setNetaddress("127.0.0.3");
match = netaddressStrategy.match(accessControl);
Assert.assertTrue(match);
accessControl.setNetaddress("127.0.0.4");
match = netaddressStrategy.match(accessControl);
Assert.assertFalse(match);
accessControl.setNetaddress("127.0.0.0");
match = netaddressStrategy.match(accessControl);
Assert.assertFalse(match);
}
@Test
public void rangeNetaddressStrategyTest() {
String head = "127.0.0.";
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress("127.0.0.1-200");
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
rangeNetaddressStrategyTest(netaddressStrategy, head, 1, 200, true);
accessControl.setNetaddress("127.0.0.*");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
rangeNetaddressStrategyTest(netaddressStrategy, head, 0, 255, true);
accessControl.setNetaddress("127.0.1-200.*");
netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
rangeNetaddressStrategyThirdlyTest(netaddressStrategy, head, 1, 200);
}
private void rangeNetaddressStrategyTest(NetaddressStrategy netaddressStrategy, String head, int start, int end,
boolean isFalse) {
AccessControl accessControl = new AccessControl();
for (int i = -10; i < 300; i++) {
accessControl.setNetaddress(head + i);
boolean match = netaddressStrategy.match(accessControl);
if (isFalse && i >= start && i <= end) {
Assert.assertTrue(match);
continue;
}
Assert.assertFalse(match);
}
}
private void rangeNetaddressStrategyThirdlyTest(NetaddressStrategy netaddressStrategy, String head, int start,
int end) {
String newHead;
for (int i = -10; i < 300; i++) {
newHead = head + i;
if (i >= start && i <= end) {
rangeNetaddressStrategyTest(netaddressStrategy, newHead, 0, 255, false);
}
}
}
@Test(expected = AclPlugRuntimeException.class)
public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
}
@Test(expected = AclPlugRuntimeException.class)
public void rangeNetaddressStrategyExceptionScopeTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.-1-200");
}
@Test(expected = AclPlugRuntimeException.class)
public void rangeNetaddressStrategyExceptionScopeTwoTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.0-256");
}
private void rangeNetaddressStrategyExceptionTest(String netaddress) {
AccessControl accessControl = new AccessControl();
accessControl.setNetaddress(netaddress);
netaddressStrategyFactory.getNetaddressStrategy(accessControl);
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
onlyNetAddress:
netaddress: 10.10.103.*
noPermitPullTopic:
- broker-a
list:
- account: RocketMQ
password: 1234567
netaddress: 192.0.0.*
permitSendTopic:
- test1
- test2
- account: RocketMQ
password: 1234567
netaddress: 192.0.2.1
permitSendTopic:
- test3
- test4
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -52,6 +48,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl-plug</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -31,6 +32,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclPlugController;
import org.apache.rocketmq.acl.plug.AclRemotingService;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -91,6 +97,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
......@@ -157,6 +164,8 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private AclPlugController aclPlugController;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
......@@ -467,6 +476,7 @@ public class BrokerController {
}
}
initialTransaction();
initialAclPlug();
}
return result;
}
......@@ -486,6 +496,47 @@ public class BrokerController {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
private void initialAclPlug() {
try {
if (!this.brokerConfig.isAclPlug()) {
log.info("Default does not start acl plug");
return;
}
ControllerParameters controllerParameters = new ControllerParameters();
controllerParameters.setFileHome(brokerConfig.getRocketmqHome());
aclPlugController = new AclPlugController(controllerParameters);
if (!aclPlugController.isStartSucceed()) {
log.error("start acl plug failure");
return;
}
final AclRemotingService aclRemotingService = aclPlugController.getAclRemotingService();
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
HashMap<String, String> extFields = request.getExtFields();
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
accessControl.setTopic(extFields.get("topic"));
}
aclRemotingService.check(accessControl);
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
public void registerProcessor() {
/**
* SendMessageProcessor
......@@ -1049,7 +1100,12 @@ public class BrokerController {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
public AclPlugController getAclPlugController() {
return this.aclPlugController;
}
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
}
......@@ -72,6 +72,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) {
this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr);
}
}
@Override
......
......@@ -171,6 +171,18 @@ public class BrokerConfig {
@ImportantField
private long transactionCheckInterval = 60 * 1000;
private boolean isAclPlug;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public boolean isTraceOn() {
return traceOn;
}
......@@ -235,16 +247,6 @@ public class BrokerConfig {
this.slaveReadEnable = slaveReadEnable;
}
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
......@@ -709,6 +711,14 @@ public class BrokerConfig {
this.transactionCheckInterval = transactionCheckInterval;
}
public boolean isAclPlug() {
return isAclPlug;
}
public void setAclPlug(boolean isAclPlug) {
this.isAclPlug = isAclPlug;
}
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
......@@ -732,4 +742,5 @@ public class BrokerConfig {
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
}
......@@ -36,4 +36,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
public static final String ACL_PLUG_LOGGER_NAME = "RocketmqAclPlug";
}
......@@ -17,14 +17,13 @@
package org.apache.rocketmq.common;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
......@@ -20,3 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
aclPlug=true
namesrvAddr=127.0.0.1:9876
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
onlyNetAddress:
netaddress: 10.10.103.*
noPermitPullTopic:
- broker-a
list:
- account: RocketMQ
password: 1234567
netaddress: 192.0.0.*
permitSendTopic:
- test1
- test2
- account: RocketMQ
password: 1234567
netaddress: 192.0.2.1
permitSendTopic:
- test3
- test4
\ No newline at end of file
......@@ -29,10 +29,10 @@ public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
......
......@@ -16,7 +16,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache</groupId>
......@@ -125,6 +126,7 @@
<module>distribution</module>
<module>openmessaging</module>
<module>logging</module>
<module>acl-plug</module>
</modules>
<build>
......@@ -157,7 +159,7 @@
</executions>
<configuration>
<rules>
<banCircularDependencies />
<banCircularDependencies/>
</rules>
<fail>true</fail>
</configuration>
......@@ -521,6 +523,11 @@
<artifactId>rocketmq-example</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl-plug</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册