提交 37913d8e 编写于 作者: H huzongtang

add acl model to snode branch,adjust some codes and pass all of unit test cases.

上级 218ed2d9
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-logging</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
public interface AccessResource {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator {
/**
* Parse to get the AccessResource(user, resource, needed permission)
*
* @param request
* @param remoteAddr
* @return Plain access resource result,include access key,signature and some other access attributes.
*/
AccessResource parse(RemotingCommand request, String remoteAddr);
/**
* Validate the access resource.
*
* @param accessResource
*/
void validate(AccessResource accessResource);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.lang.reflect.Field;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY;
import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN;
import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE;
public class AclClientRPCHook implements RPCHook {
private final SessionCredentials sessionCredentials;
protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
public AclClientRPCHook(SessionCredentials sessionCredentials) {
this.sessionCredentials = sessionCredentials;
}
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
byte[] total = AclUtils.combineRequestContent(request,
parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
CommandCustomHeader header = request.readCustomHeader();
// Sort property
SortedMap<String, String> map = new TreeMap<String, String>();
map.put(ACCESS_KEY, ak);
if (securityToken != null) {
map.put(SECURITY_TOKEN, securityToken);
}
try {
// Add header properties
if (null != header) {
Field[] fields = fieldCache.get(header.getClass());
if (null == fields) {
fields = header.getClass().getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
}
Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
if (null != tmp) {
fields = tmp;
}
}
for (Field field : fields) {
Object value = field.get(header);
if (null != value && !field.isSynthetic()) {
map.put(field.getName(), value.toString());
}
}
}
return map;
} catch (Exception e) {
throw new RuntimeException("incompatible exception.", e);
}
}
public SessionCredentials getSessionCredentials() {
return sessionCredentials;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
public class AclException extends RuntimeException {
private static final long serialVersionUID = -7256002576788700354L;
private String status;
private int code;
public AclException(String status, int code) {
super();
this.status = status;
this.code = code;
}
public AclException(String status, int code, String message) {
super(message);
this.status = status;
this.code = code;
}
public AclException(String message) {
super(message);
}
public AclException(String message, Throwable throwable) {
super(message, throwable);
}
public AclException(String status, int code, String message, Throwable throwable) {
super(message, throwable);
this.status = status;
this.code = code;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.nio.charset.Charset;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class AclSigner {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);
private static final int CAL_SIGNATURE_FAILED = 10015;
private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";
public static String calSignature(String data, String key) throws AclException {
return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
}
public static String calSignature(String data, String key, SigningAlgorithm algorithm,
Charset charset) throws AclException {
return signAndBase64Encode(data, key, algorithm, charset);
}
private static String signAndBase64Encode(String data, String key, SigningAlgorithm algorithm, Charset charset)
throws AclException {
try {
byte[] signature = sign(data.getBytes(charset), key.getBytes(charset), algorithm);
return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
log.error(message, e);
throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
}
}
private static byte[] sign(byte[] data, byte[] key, SigningAlgorithm algorithm) throws AclException {
try {
Mac mac = Mac.getInstance(algorithm.toString());
mac.init(new SecretKeySpec(key, algorithm.toString()));
return mac.doFinal(data);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
log.error(message, e);
throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
}
}
public static String calSignature(byte[] data, String key) throws AclException {
return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
}
public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm,
Charset charset) throws AclException {
return signAndBase64Encode(data, key, algorithm, charset);
}
private static String signAndBase64Encode(byte[] data, String key, SigningAlgorithm algorithm, Charset charset)
throws AclException {
try {
byte[] signature = sign(data, key.getBytes(charset), algorithm);
return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
log.error(message, e);
throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.yaml.snakeyaml.Yaml;
import static org.apache.rocketmq.acl.common.SessionCredentials.CHARSET;
public class AclUtils {
public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {
try {
StringBuilder sb = new StringBuilder("");
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
sb.append(entry.getValue());
}
}
return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
} catch (Exception e) {
throw new RuntimeException("incompatible exception.", e);
}
}
public static byte[] combineBytes(byte[] b1, byte[] b2) {
int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
byte[] total = new byte[size];
if (null != b1)
System.arraycopy(b1, 0, total, 0, b1.length);
if (null != b2)
System.arraycopy(b2, 0, total, b1.length, b2.length);
return total;
}
public static String calSignature(byte[] data, String secretKey) {
String signature = AclSigner.calSignature(data, secretKey);
return signature;
}
public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
}
}
public static String[] getAddreeStrArray(String netaddress, String four) {
String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
String address = netaddress.substring(0, netaddress.indexOf("{"));
String[] addreeStrArray = new String[fourStrArray.length];
for (int i = 0; i < fourStrArray.length; i++) {
addreeStrArray[i] = address + fourStrArray[i];
}
return addreeStrArray;
}
public static boolean isScope(String num, int index) {
String[] strArray = StringUtils.split(num, ".");
if (strArray.length != 4) {
return false;
}
return isScope(strArray, index);
}
public static boolean isScope(String[] num, int index) {
if (num.length <= index) {
}
for (int i = 0; i < index; i++) {
if (!isScope(num[i])) {
return false;
}
}
return true;
}
public static boolean isScope(String num) {
return isScope(Integer.valueOf(num.trim()));
}
public static boolean isScope(int num) {
return num >= 0 && num <= 255;
}
public static boolean isAsterisk(String asterisk) {
return asterisk.indexOf('*') > -1;
}
public static boolean isColon(String colon) {
return colon.indexOf(',') > -1;
}
public static boolean isMinus(String minus) {
return minus.indexOf('-') > -1;
}
public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml ymal = new Yaml();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (Exception e) {
throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclException("close transport fileInputStream Exception", e);
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.common.protocol.RequestCode;
public class Permission {
public static final byte DENY = 1;
public static final byte ANY = 1 << 1;
public static final byte PUB = 1 << 2;
public static final byte SUB = 1 << 3;
public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>();
static {
// UPDATE_AND_CREATE_TOPIC
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC);
// UPDATE_BROKER_CONFIG
ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG);
// DELETE_TOPIC_IN_BROKER
ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP);
// DELETE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.DELETE_SUBSCRIPTIONGROUP);
}
public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
if ((ownedPerm & DENY) > 0) {
return false;
}
if ((neededPerm & ANY) > 0) {
return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);
}
return (neededPerm & ownedPerm) > 0;
}
public static byte parsePermFromString(String permString) {
if (permString == null) {
return Permission.DENY;
}
switch (permString.trim()) {
case "PUB":
return Permission.PUB;
case "SUB":
return Permission.SUB;
case "PUB|SUB":
return Permission.PUB | Permission.SUB;
case "SUB|PUB":
return Permission.PUB | Permission.SUB;
case "DENY":
return Permission.DENY;
default:
return Permission.DENY;
}
}
public static void parseResourcePerms(PlainAccessResource plainAccessResource, Boolean isTopic,
List<String> resources) {
if (resources == null || resources.isEmpty()) {
return;
}
for (String resource : resources) {
String[] items = StringUtils.split(resource, "=");
if (items.length == 2) {
plainAccessResource.addResourceAndPerm(isTopic ? items[0].trim() : PlainAccessResource.getRetryTopic(items[0].trim()), parsePermFromString(items[1].trim()));
} else {
throw new AclException(String.format("Parse resource permission failed for %s:%s", isTopic ? "topic" : "group", resource));
}
}
}
public static boolean needAdminPerm(Integer code) {
return ADMIN_CODE.contains(code);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.rocketmq.common.MixAll;
public class SessionCredentials {
public static final Charset CHARSET = Charset.forName("UTF-8");
public static final String ACCESS_KEY = "AccessKey";
public static final String SECRET_KEY = "SecretKey";
public static final String SIGNATURE = "Signature";
public static final String SECURITY_TOKEN = "SecurityToken";
public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",
System.getProperty("user.home") + File.separator + "key");
private String accessKey;
private String secretKey;
private String securityToken;
private String signature;
public SessionCredentials() {
String keyContent = null;
try {
keyContent = MixAll.file2String(KEY_FILE);
} catch (IOException ignore) {
}
if (keyContent != null) {
Properties prop = MixAll.string2Properties(keyContent);
if (prop != null) {
this.updateContent(prop);
}
}
}
public SessionCredentials(String accessKey, String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}
public SessionCredentials(String accessKey, String secretKey, String securityToken) {
this(accessKey, secretKey);
this.securityToken = securityToken;
}
public void updateContent(Properties prop) {
{
String value = prop.getProperty(ACCESS_KEY);
if (value != null) {
this.accessKey = value.trim();
}
}
{
String value = prop.getProperty(SECRET_KEY);
if (value != null) {
this.secretKey = value.trim();
}
}
{
String value = prop.getProperty(SECURITY_TOKEN);
if (value != null) {
this.securityToken = value.trim();
}
}
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
public String getSecurityToken() {
return securityToken;
}
public void setSecurityToken(final String securityToken) {
this.securityToken = securityToken;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((accessKey == null) ? 0 : accessKey.hashCode());
result = prime * result + ((secretKey == null) ? 0 : secretKey.hashCode());
result = prime * result + ((signature == null) ? 0 : signature.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SessionCredentials other = (SessionCredentials) obj;
if (accessKey == null) {
if (other.accessKey != null)
return false;
} else if (!accessKey.equals(other.accessKey))
return false;
if (secretKey == null) {
if (other.secretKey != null)
return false;
} else if (!secretKey.equals(other.secretKey))
return false;
if (signature == null) {
if (other.signature != null)
return false;
} else if (!signature.equals(other.signature))
return false;
return true;
}
@Override
public String toString() {
return "SessionCredentials [accessKey=" + accessKey + ", secretKey=" + secretKey + ", signature="
+ signature + ", SecurityToken=" + securityToken + "]";
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
public enum SigningAlgorithm {
HmacSHA1,
HmacSHA256,
HmacMD5;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.common.MixAll;
public class PlainAccessResource implements AccessResource {
// Identify the user
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private byte defaultTopicPerm = 1;
private byte defaultGroupPerm = 1;
private Map<String, Byte> resourcePermMap;
private RemoteAddressStrategy remoteAddressStrategy;
private int requestCode;
//the content to calculate the content
private byte[] content;
private String signature;
private String secretToken;
private String recognition;
public PlainAccessResource() {
}
public static boolean isRetryTopic(String topic) {
return null != topic && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
public static String printStr(String resource, boolean isGroup) {
if (resource == null) {
return null;
}
if (isGroup) {
return String.format("%s:%s", "group", getGroupFromRetryTopic(resource));
} else {
return String.format("%s:%s", "topic", resource);
}
}
public static String getGroupFromRetryTopic(String retryTopic) {
if (retryTopic == null) {
return null;
}
return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
}
public static String getRetryTopic(String group) {
if (group == null) {
return null;
}
return MixAll.getRetryTopic(group);
}
public void addResourceAndPerm(String resource, byte perm) {
if (resource == null) {
return;
}
if (resourcePermMap == null) {
resourcePermMap = new HashMap<>();
}
resourcePermMap.put(resource, perm);
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public byte getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(byte defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public byte getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(byte defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public Map<String, Byte> getResourcePermMap() {
return resourcePermMap;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
public int getRequestCode() {
return requestCode;
}
public void setRequestCode(int requestCode) {
this.requestCode = requestCode;
}
public String getSecretToken() {
return secretToken;
}
public void setSecretToken(String secretToken) {
this.secretToken = secretToken;
}
public RemoteAddressStrategy getRemoteAddressStrategy() {
return remoteAddressStrategy;
}
public void setRemoteAddressStrategy(RemoteAddressStrategy remoteAddressStrategy) {
this.remoteAddressStrategy = remoteAddressStrategy;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic;
public class PlainAccessValidator implements AccessValidator {
private PlainPermissionLoader aclPlugEngine;
public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionLoader();
}
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
try {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
}
}
break;
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
break;
default:
break;
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);
}
// Content
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;
}
@Override
public void validate(AccessResource accessResource) {
aclPlugEngine.validate((PlainAccessResource) accessResource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.srvutil.FileWatchService;
public class PlainPermissionLoader {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
private boolean isWatchStart;
public PlainPermissionLoader() {
load();
watch();
}
public void load() {
Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
log.info("Broker plain acl conf data is : ", plainAclConfData.toString());
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
JSONArray accounts = plainAclConfData.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
}
}
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;
}
private void watch() {
try {
String watchFilePath = fileHome + fileName;
FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
@Override
public void onChanged(String path) {
log.info("The plain acl yml changed, reload the context");
load();
}
});
fileWatchService.start();
log.info("Succeed to start AclWatcherService");
this.isWatchStart = true;
} catch (Exception e) {
log.error("Failed to start AclWatcherService", e);
}
}
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
// If the needCheckedPermMap is null,then return
return;
}
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (!ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
}
void clearPermissionInfo() {
this.plainAccessResourceMap.clear();
this.globalWhiteRemoteAddressStrategy.clear();
}
public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException {
if (plainAccessConfig.getAccessKey() == null
|| plainAccessConfig.getSecretKey() == null
|| plainAccessConfig.getAccessKey().length() <= 6
|| plainAccessConfig.getSecretKey().length() <= 6) {
throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
}
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
plainAccessResource.setSecretKey(plainAccessConfig.getSecretKey());
plainAccessResource.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
plainAccessResource.setAdmin(plainAccessConfig.isAdmin());
plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultGroupPerm()));
plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultTopicPerm()));
Permission.parseResourcePerms(plainAccessResource, false, plainAccessConfig.getGroupPerms());
Permission.parseResourcePerms(plainAccessResource, true, plainAccessConfig.getTopicPerms());
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress()));
return plainAccessResource;
}
public void validate(PlainAccessResource plainAccessResource) {
// Check the global white remote addr
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
if (plainAccessResource.getAccessKey() == null) {
throw new AclException(String.format("No accessKey is configured"));
}
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
}
// Check the white addr for accesskey
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
// Check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
// Check perm of each resource
checkPerm(plainAccessResource, ownedAccess);
}
public boolean isWatchStart() {
return isWatchStart;
}
static class PlainAccessConfig {
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private String defaultTopicPerm;
private String defaultGroupPerm;
private List<String> topicPerms;
private List<String> groupPerms;
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public String getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(String defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public String getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(String defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public List<String> getTopicPerms() {
return topicPerms;
}
public void setTopicPerms(List<String> topicPerms) {
this.topicPerms = topicPerms;
}
public List<String> getGroupPerms() {
return groupPerms;
}
public void setGroupPerms(List<String> groupPerms) {
this.groupPerms = groupPerms;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
public interface RemoteAddressStrategy {
boolean match(PlainAccessResource plainAccessResource);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class RemoteAddressStrategyFactory {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final NullRemoteAddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullRemoteAddressStrategy();
public static final BlankRemoteAddressStrategy BLANK_NET_ADDRESS_STRATEGY = new BlankRemoteAddressStrategy();
public RemoteAddressStrategy getRemoteAddressStrategy(PlainAccessResource plainAccessResource) {
return getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress());
}
public RemoteAddressStrategy getRemoteAddressStrategy(String remoteAddr) {
if (StringUtils.isBlank(remoteAddr)) {
return BLANK_NET_ADDRESS_STRATEGY;
}
if ("*".equals(remoteAddr)) {
return NULL_NET_ADDRESS_STRATEGY;
}
if (remoteAddr.endsWith("}")) {
String[] strArray = StringUtils.split(remoteAddr, ".");
String four = strArray[3];
if (!four.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
}
return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, four));
} else if (AclUtils.isColon(remoteAddr)) {
return new MultipleRemoteAddressStrategy(StringUtils.split(remoteAddr, ","));
} else if (AclUtils.isAsterisk(remoteAddr) || AclUtils.isMinus(remoteAddr)) {
return new RangeRemoteAddressStrategy(remoteAddr);
}
return new OneRemoteAddressStrategy(remoteAddr);
}
public static class NullRemoteAddressStrategy implements RemoteAddressStrategy {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return true;
}
}
public static class BlankRemoteAddressStrategy implements RemoteAddressStrategy {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return false;
}
}
public static class MultipleRemoteAddressStrategy implements RemoteAddressStrategy {
private final Set<String> multipleSet = new HashSet<>();
public MultipleRemoteAddressStrategy(String[] strArray) {
for (String netaddress : strArray) {
AclUtils.verify(netaddress, 4);
multipleSet.add(netaddress);
}
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return multipleSet.contains(plainAccessResource.getWhiteRemoteAddress());
}
}
public static class OneRemoteAddressStrategy implements RemoteAddressStrategy {
private String netaddress;
public OneRemoteAddressStrategy(String netaddress) {
this.netaddress = netaddress;
AclUtils.verify(netaddress, 4);
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return netaddress.equals(plainAccessResource.getWhiteRemoteAddress());
}
}
public static class RangeRemoteAddressStrategy implements RemoteAddressStrategy {
private String head;
private int start;
private int end;
private int index;
public RangeRemoteAddressStrategy(String remoteAddr) {
String[] strArray = StringUtils.split(remoteAddr, ".");
if (analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(remoteAddr, index - 1);
StringBuffer sb = new StringBuffer().append(strArray[0].trim()).append(".").append(strArray[1].trim()).append(".");
if (index == 3) {
sb.append(strArray[2].trim()).append(".");
}
this.head = sb.toString();
}
}
private boolean analysis(String[] strArray, int index) {
String value = strArray[index].trim();
this.index = index;
if ("*".equals(value)) {
setValue(0, 255);
} else if (AclUtils.isMinus(value)) {
if (value.indexOf("-") == 0) {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception value %s ", value));
}
String[] valueArray = StringUtils.split(value, "-");
this.start = Integer.valueOf(valueArray[0]);
this.end = Integer.valueOf(valueArray[1]);
if (!(AclUtils.isScope(end) && AclUtils.isScope(start) && start <= end)) {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
}
}
return this.end > 0 ? true : false;
}
private void setValue(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
String netAddress = plainAccessResource.getWhiteRemoteAddress();
if (netAddress.startsWith(this.head)) {
String value;
if (index == 3) {
value = netAddress.substring(this.head.length());
} else {
value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
}
Integer address = Integer.valueOf(value);
if (address >= this.start && address <= this.end) {
return true;
}
}
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import org.junit.Test;
public class AclSignerTest {
@Test(expected = Exception.class)
public void calSignatureExceptionTest(){
AclSigner.calSignature(new byte[]{},"");
}
@Test
public void calSignatureTest(){
AclSigner.calSignature("RocketMQ","12345678");
AclSigner.calSignature("RocketMQ".getBytes(),"12345678");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
public class AclUtilsTest {
@Test
public void getAddreeStrArray() {
String address = "1.1.1.{1,2,3,4}";
String[] addressArray = AclUtils.getAddreeStrArray(address, "{1,2,3,4}");
List<String> newAddressList = new ArrayList<>();
for (String a : addressArray) {
newAddressList.add(a);
}
List<String> addressList = new ArrayList<>();
addressList.add("1.1.1.1");
addressList.add("1.1.1.2");
addressList.add("1.1.1.3");
addressList.add("1.1.1.4");
Assert.assertEquals(newAddressList, addressList);
}
@Test
public void isScopeStringArray() {
String adderss = "12";
for (int i = 0; i < 6; i++) {
boolean isScope = AclUtils.isScope(adderss, 4);
if (i == 3) {
Assert.assertTrue(isScope);
} else {
Assert.assertFalse(isScope);
}
adderss = adderss + ".12";
}
}
@Test
public void isScopeArray() {
String[] adderss = StringUtils.split("12.12.12.12", ".");
boolean isScope = AclUtils.isScope(adderss, 4);
Assert.assertTrue(isScope);
isScope = AclUtils.isScope(adderss, 3);
Assert.assertTrue(isScope);
adderss = StringUtils.split("12.12.1222.1222", ".");
isScope = AclUtils.isScope(adderss, 4);
Assert.assertFalse(isScope);
isScope = AclUtils.isScope(adderss, 3);
Assert.assertFalse(isScope);
}
@Test
public void isScopeStringTest() {
for (int i = 0; i < 256; i++) {
boolean isScope = AclUtils.isScope(i + "");
Assert.assertTrue(isScope);
}
boolean isScope = AclUtils.isScope("-1");
Assert.assertFalse(isScope);
isScope = AclUtils.isScope("256");
Assert.assertFalse(isScope);
}
@Test
public void isScopeTest() {
for (int i = 0; i < 256; i++) {
boolean isScope = AclUtils.isScope(i);
Assert.assertTrue(isScope);
}
boolean isScope = AclUtils.isScope(-1);
Assert.assertFalse(isScope);
isScope = AclUtils.isScope(256);
Assert.assertFalse(isScope);
}
@Test
public void isAsteriskTest() {
boolean isAsterisk = AclUtils.isAsterisk("*");
Assert.assertTrue(isAsterisk);
isAsterisk = AclUtils.isAsterisk(",");
Assert.assertFalse(isAsterisk);
}
@Test
public void isColonTest() {
boolean isColon = AclUtils.isColon(",");
Assert.assertTrue(isColon);
isColon = AclUtils.isColon("-");
Assert.assertFalse(isColon);
}
@Test
public void isMinusTest() {
boolean isMinus = AclUtils.isMinus("-");
Assert.assertTrue(isMinus);
isMinus = AclUtils.isMinus("*");
Assert.assertFalse(isMinus);
}
@SuppressWarnings("unchecked")
@Test
public void getYamlDataObjectTest() {
Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class);
Assert.assertFalse(map.isEmpty());
}
@Test(expected = Exception.class)
public void getYamlDataObjectExceptionTest() {
AclUtils.getYamlDataObject("plain_acl.yml", Map.class);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.junit.Assert;
import org.junit.Test;
public class PermissionTest {
@Test
public void fromStringGetPermissionTest() {
byte perm = Permission.parsePermFromString("PUB");
Assert.assertEquals(perm, Permission.PUB);
perm = Permission.parsePermFromString("SUB");
Assert.assertEquals(perm, Permission.SUB);
perm = Permission.parsePermFromString("PUB|SUB");
Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
perm = Permission.parsePermFromString("SUB|PUB");
Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
perm = Permission.parsePermFromString("DENY");
Assert.assertEquals(perm, Permission.DENY);
perm = Permission.parsePermFromString("1");
Assert.assertEquals(perm, Permission.DENY);
perm = Permission.parsePermFromString(null);
Assert.assertEquals(perm, Permission.DENY);
}
@Test
public void checkPermissionTest() {
boolean boo = Permission.checkPermission(Permission.DENY, Permission.DENY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.PUB, Permission.PUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.SUB, Permission.SUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.PUB, (byte) (Permission.PUB|Permission.SUB));
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.SUB, (byte) (Permission.PUB|Permission.SUB));
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, (byte) (Permission.PUB|Permission.SUB));
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.SUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.PUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.ANY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.PUB);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.SUB);
Assert.assertFalse(boo);
}
@Test(expected = AclException.class)
public void setTopicPermTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Permission.parseResourcePerms(plainAccessResource, false, null);
Assert.assertNull(resourcePermMap);
List<String> groups = new ArrayList<>();
Permission.parseResourcePerms(plainAccessResource, false, groups);
Assert.assertNull(resourcePermMap);
groups.add("groupA=DENY");
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
Permission.parseResourcePerms(plainAccessResource, false, groups);
resourcePermMap = plainAccessResource.getResourcePermMap();
byte perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA"));
Assert.assertEquals(perm, Permission.DENY);
perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB"));
Assert.assertEquals(perm,Permission.PUB|Permission.SUB);
perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC"));
Assert.assertEquals(perm, Permission.PUB);
List<String> topics = new ArrayList<>();
topics.add("topicA=DENY");
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
Permission.parseResourcePerms(plainAccessResource, true, topics);
perm = resourcePermMap.get("topicA");
Assert.assertEquals(perm, Permission.DENY);
perm = resourcePermMap.get("topicB");
Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
perm = resourcePermMap.get("topicC");
Assert.assertEquals(perm, Permission.PUB);
List<String> erron = new ArrayList<>();
erron.add("");
Permission.parseResourcePerms(plainAccessResource, false, erron);
}
@Test
public void checkAdminCodeTest() {
Set<Integer> code = new HashSet<>();
code.add(17);
code.add(25);
code.add(215);
code.add(200);
code.add(207);
for (int i = 0; i < 400; i++) {
boolean boo = Permission.needAdminPerm(i);
if (boo) {
Assert.assertTrue(code.contains(i));
}
}
}
@Test
public void AclExceptionTest(){
AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015);
AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception");
Assert.assertEquals(aclException.getCode(),10015);
Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED");
aclException.setCode(10016);
Assert.assertEquals(aclException.getCode(),10016);
aclException.setStatus("netaddress examine scope Exception netaddress");
Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class SessionCredentialsTest {
@Test
public void equalsTest(){
SessionCredentials sessionCredentials=new SessionCredentials("RocketMQ","12345678");
sessionCredentials.setSecurityToken("abcd");
SessionCredentials other=new SessionCredentials("RocketMQ","12345678","abcd");
Assert.assertTrue(sessionCredentials.equals(other));
}
@Test
public void updateContentTest(){
SessionCredentials sessionCredentials=new SessionCredentials();
Properties properties=new Properties();
properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredentials.updateContent(properties);
}
@Test
public void SessionCredentialHashCodeTest(){
SessionCredentials sessionCredentials=new SessionCredentials();
Properties properties=new Properties();
properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredentials.updateContent(properties);
Assert.assertEquals(sessionCredentials.hashCode(),353652211);
}
@Test
public void SessionCredentialEqualsTest(){
SessionCredentials sessionCredential1 =new SessionCredentials();
Properties properties1=new Properties();
properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential1.updateContent(properties1);
SessionCredentials sessionCredential2 =new SessionCredentials();
Properties properties2=new Properties();
properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential2.updateContent(properties2);
Assert.assertTrue(sessionCredential2.equals(sessionCredential1));
sessionCredential2.setSecretKey("1234567899");
sessionCredential2.setSignature("1234567899");
Assert.assertFalse(sessionCredential2.equals(sessionCredential1));
}
@Test
public void SessionCredentialToStringTest(){
SessionCredentials sessionCredential1 =new SessionCredentials();
Properties properties1=new Properties();
properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential1.updateContent(properties1);
Assert.assertEquals(sessionCredential1.toString(),
"SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.*;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PlainAccessValidatorTest {
private PlainAccessValidator plainAccessValidator;
private AclClientRPCHook aclClient;
private SessionCredentials sessionCredentials;
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("12345678");
sessionCredentials.setSecurityToken("87654321");
aclClient = new AclClientRPCHook(sessionCredentials);
}
@Test
public void contentTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encode(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "127.0.0.1");
String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
Assert.assertEquals(accessResource.getSignature(), signature);
}
@Test
public void validateTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encode(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateSendMessageTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateSendMessageV2Test() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validatePullMessageTest() {
PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicC");
pullMessageRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateConsumeMessageBackTest() {
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader=new ConsumerSendMsgBackRequestHeader();
consumerSendMsgBackRequestHeader.setOriginTopic("topicC");
consumerSendMsgBackRequestHeader.setGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,consumerSendMsgBackRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateQueryMessageTest() {
QueryMessageRequestHeader queryMessageRequestHeader=new QueryMessageRequestHeader();
queryMessageRequestHeader.setTopic("topicC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE,queryMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateHeartBeatTest() {
HeartbeatData heartbeatData=new HeartbeatData();
Set<ProducerData> producerDataSet=new HashSet<>();
Set<ConsumerData> consumerDataSet=new HashSet<>();
Set<SubscriptionData> subscriptionDataSet=new HashSet<>();
ProducerData producerData=new ProducerData();
producerData.setGroupName("producerGroupA");
ConsumerData consumerData=new ConsumerData();
consumerData.setGroupName("consumerGroupA");
SubscriptionData subscriptionData=new SubscriptionData();
subscriptionData.setTopic("topicC");
producerDataSet.add(producerData);
consumerDataSet.add(consumerData);
subscriptionDataSet.add(subscriptionData);
consumerData.setSubscriptionDataSet(subscriptionDataSet);
heartbeatData.setProducerDataSet(producerDataSet);
heartbeatData.setConsumerDataSet(consumerDataSet);
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null);
remotingCommand.setBody(heartbeatData.encode());
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encode(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateUnRegisterClientTest() {
UnregisterClientRequestHeader unregisterClientRequestHeader=new UnregisterClientRequestHeader();
unregisterClientRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT,unregisterClientRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateGetConsumerListByGroupTest() {
GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader=new GetConsumerListByGroupRequestHeader();
getConsumerListByGroupRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP,getConsumerListByGroupRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateUpdateConsumerOffSetTest() {
UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader=new UpdateConsumerOffsetRequestHeader();
updateConsumerOffsetRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,updateConsumerOffsetRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test(expected = AclException.class)
public void validateNullAccessKeyTest() {
SessionCredentials sessionCredentials=new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ1");
sessionCredentials.setSecretKey("1234");
AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.1.1");
plainAccessValidator.validate(accessResource);
}
@Test(expected = AclException.class)
public void validateErrorSecretKeyTest() {
SessionCredentials sessionCredentials=new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("1234");
AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.1.1");
plainAccessValidator.validate(accessResource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig;
import org.apache.rocketmq.common.UtilAll;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PlainPermissionLoaderTest {
PlainPermissionLoader plainPermissionLoader;
PlainAccessResource PUBPlainAccessResource;
PlainAccessResource SUBPlainAccessResource;
PlainAccessResource ANYPlainAccessResource;
PlainAccessResource DENYPlainAccessResource;
PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
Set<Integer> adminCode = new HashSet<>();
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
// UPDATE_AND_CREATE_TOPIC
adminCode.add(17);
// UPDATE_BROKER_CONFIG
adminCode.add(25);
// DELETE_TOPIC_IN_BROKER
adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200);
// DELETE_SUBSCRIPTIONGROUP
adminCode.add(207);
PUBPlainAccessResource = clonePlainAccessResource(Permission.PUB);
SUBPlainAccessResource = clonePlainAccessResource(Permission.SUB);
ANYPlainAccessResource = clonePlainAccessResource(Permission.ANY);
DENYPlainAccessResource = clonePlainAccessResource(Permission.DENY);
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
plainPermissionLoader = new PlainPermissionLoader();
}
public PlainAccessResource clonePlainAccessResource(byte perm) {
PlainAccessResource painAccessResource = new PlainAccessResource();
painAccessResource.setAccessKey("RocketMQ");
painAccessResource.setSecretKey("12345678");
painAccessResource.setWhiteRemoteAddress("127.0." + perm + ".*");
painAccessResource.setDefaultGroupPerm(perm);
painAccessResource.setDefaultTopicPerm(perm);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupA"), Permission.PUB);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupB"), Permission.SUB);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupC"), Permission.ANY);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupD"), Permission.DENY);
painAccessResource.addResourceAndPerm("topicA", Permission.PUB);
painAccessResource.addResourceAndPerm("topicB", Permission.SUB);
painAccessResource.addResourceAndPerm("topicC", Permission.ANY);
painAccessResource.addResourceAndPerm("topicD", Permission.DENY);
return painAccessResource;
}
@Test
public void buildPlainAccessResourceTest() {
PlainAccessResource plainAccessResource = null;
PlainAccessConfig plainAccess = new PlainAccessConfig();
plainAccess.setAccessKey("RocketMQ");
plainAccess.setSecretKey("12345678");
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
plainAccess.setWhiteRemoteAddress("127.0.0.1");
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
plainAccess.setAdmin(true);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.isAdmin(), true);
List<String> groups = new ArrayList<String>();
groups.add("groupA=DENY");
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
plainAccess.setGroupPerms(groups);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB);
List<String> topics = new ArrayList<String>();
topics.add("topicA=DENY");
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
plainAccess.setTopicPerms(topics);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 6);
Assert.assertEquals(resourcePermMap.get("topicA").byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get("topicC").byteValue(), Permission.PUB);
}
@Test(expected = AclException.class)
public void checkPermAdmin() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRequestCode(17);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
}
@Test
public void checkPerm() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
}
@Test(expected = AclException.class)
public void checkErrorPerm() {
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
}
@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessConfig.setAccessKey(null);
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void accountThanTest() {
plainAccessConfig.setAccessKey("123");
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void passWordtNullTest() {
plainAccessConfig.setAccessKey(null);
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void passWordThanTest() {
plainAccessConfig.setAccessKey("123");
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
new PlainPermissionLoader().load();
}
@SuppressWarnings("unchecked")
@Test
public void cleanAuthenticationInfoTest() throws IllegalAccessException {
//plainPermissionLoader.addPlainAccessResource(plainAccessResource);
Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
Assert.assertFalse(plainAccessResourceMap.isEmpty());
plainPermissionLoader.clearPermissionInfo();
plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
Assert.assertTrue(plainAccessResourceMap.isEmpty());
}
@Test
public void isWatchStartTest() {
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart());
}
@Test
public void testWatch() throws IOException, IllegalAccessException ,InterruptedException{
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl-test.yml");
String fileName =System.getProperty("rocketmq.home.dir", "src/test/resources")+System.getProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
File transport = new File(fileName);
transport.delete();
transport.createNewFile();
FileWriter writer = new FileWriter(transport);
writer.write("accounts:\r\n");
writer.write("- accessKey: watchrocketmq\r\n");
writer.write(" secretKey: 12345678\r\n");
writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
writer.write(" admin: true\r\n");
writer.flush();
writer.close();
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart());
{
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "12345678");
Assert.assertTrue(accessResource.isAdmin());
}
writer = new FileWriter(new File(fileName), true);
writer.write("- accessKey: watchrocketmq1\r\n");
writer.write(" secretKey: 88888888\r\n");
writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
writer.write(" admin: false\r\n");
writer.flush();
writer.close();
Thread.sleep(1000);
{
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "88888888");
Assert.assertFalse(accessResource.isAdmin());
}
transport.delete();
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
}
@Test(expected = AclException.class)
public void initializeTest() {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
new PlainPermissionLoader();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import org.apache.rocketmq.acl.common.AclException;
import org.junit.Assert;
import org.junit.Test;
public class RemoteAddressStrategyTest {
RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
@Test
public void netaddressStrategyFactoryExceptionTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource).getClass(),
RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
}
@Test
public void netaddressStrategyFactoryTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("*");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.OneRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.1-20.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
}
@Test(expected = AclException.class)
public void verifyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("256.0.0.1");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
@Test
public void nullNetaddressStrategyTest() {
boolean isMatch = RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY.match(new PlainAccessResource());
Assert.assertTrue(isMatch);
}
@Test
public void blankNetaddressStrategyTest() {
boolean isMatch = RemoteAddressStrategyFactory.BLANK_NET_ADDRESS_STRATEGY.match(new PlainAccessResource());
Assert.assertFalse(isMatch);
}
public void oneNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
}
@Test
public void multipleNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
}
@Test(expected = AclException.class)
public void multipleNetaddressStrategyExceptionTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,2,3}");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
private void multipleNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.3");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.4");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.0");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
}
@Test
public void rangeNetaddressStrategyTest() {
String head = "127.0.0.";
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 1, 200, true);
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 0, 255, true);
plainAccessResource.setWhiteRemoteAddress("127.0.1-200.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
}
private void rangeNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
int end,
boolean isFalse) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
for (int i = -10; i < 300; i++) {
plainAccessResource.setWhiteRemoteAddress(head + i);
boolean match = remoteAddressStrategy.match(plainAccessResource);
if (isFalse && i >= start && i <= end) {
Assert.assertTrue(match);
continue;
}
Assert.assertFalse(match);
}
}
private void rangeNetaddressStrategyThirdlyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
int end) {
String newHead;
for (int i = -10; i < 300; i++) {
newHead = head + i;
if (i >= start && i <= end) {
rangeNetaddressStrategyTest(remoteAddressStrategy, newHead, 0, 255, false);
}
}
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionScopeTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.-1-200");
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionScopeTwoTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.0-256");
}
private void rangeNetaddressStrategyExceptionTest(String netaddress) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress(netaddress);
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
accounts:
- accessKey: watchrocketmq
secretKey: 12345678
whiteRemoteAddress: 127.0.0.1
admin: true
- accessKey: watchrocketmq1
secretKey: 88888888
whiteRemoteAddress: 127.0.0.1
admin: false
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<logger name="RocketmqCommon" level="INFO" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
accounts:
- accessKey: rokcetmq
secretKey: aliyun11
whiteRemoteAddress: 127.0.0.1
admin: true
- accessKey: rokcet1
secretKey: aliyun1
whiteRemoteAddress: 127.0.0.1
admin: true
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册