提交 4250af6e 编写于 作者: D dongeforever

Merge branch 'feature_acl' into msg_track

......@@ -12,3 +12,4 @@ devenv
!LICENSE-BIN
.DS_Store
localbin
nohup.out
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-logging</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
public interface AccessResource {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator {
/**
* Parse to get the AccessResource(user, resource, needed permission)
*
* @param request
* @return
*/
AccessResource parse(RemotingCommand request, String remoteAddr);
/**
* Validate the access resource.
*
* @param accessResource
*/
void validate(AccessResource accessResource);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.lang.reflect.Field;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY;
import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN;
import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE;
public class AclClientRPCHook implements RPCHook {
private final SessionCredentials sessionCredentials;
protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
public AclClientRPCHook(SessionCredentials sessionCredentials) {
this.sessionCredentials = sessionCredentials;
}
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
byte[] total = AclUtils.combineRequestContent(request,
parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
CommandCustomHeader header = request.readCustomHeader();
// sort property
SortedMap<String, String> map = new TreeMap<String, String>();
map.put(ACCESS_KEY, ak);
if (securityToken != null) {
map.put(SECURITY_TOKEN, securityToken);
}
try {
// add header properties
if (null != header) {
Field[] fields = fieldCache.get(header.getClass());
if (null == fields) {
fields = header.getClass().getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
}
Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
if (null != tmp) {
fields = tmp;
}
}
for (Field field : fields) {
Object value = field.get(header);
if (null != value && !field.isSynthetic()) {
map.put(field.getName(), value.toString());
}
}
}
return map;
} catch (Exception e) {
throw new RuntimeException("incompatible exception.", e);
}
}
public SessionCredentials getSessionCredentials() {
return sessionCredentials;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
public class AclException extends RuntimeException {
private static final long serialVersionUID = 1L;
private String status;
private int code;
public AclException(String status, int code) {
super();
this.status = status;
this.code = code;
}
public AclException(String status, int code, String message) {
super(message);
this.status = status;
this.code = code;
}
public AclException(String status, int code, Throwable throwable) {
super(throwable);
this.status = status;
this.code = code;
}
public AclException(String message) {
super(message);
}
public AclException(String message, Throwable throwable) {
super(message, throwable);
}
public AclException(String status, int code, String message, Throwable throwable) {
super(message, throwable);
this.status = status;
this.code = code;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.nio.charset.Charset;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class AclSigner {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);
private static final int CAL_SIGNATURE_FAILED = 10015;
private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";
public static String calSignature(String data, String key) throws AclException {
return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
}
public static String calSignature(String data, String key, SigningAlgorithm algorithm,
Charset charset) throws AclException {
return signAndBase64Encode(data, key, algorithm, charset);
}
private static String signAndBase64Encode(String data, String key, SigningAlgorithm algorithm, Charset charset)
throws AclException {
try {
byte[] signature = sign(data.getBytes(charset), key.getBytes(charset), algorithm);
return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
log.error(message, e);
throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
}
}
private static byte[] sign(byte[] data, byte[] key, SigningAlgorithm algorithm) throws AclException {
try {
Mac mac = Mac.getInstance(algorithm.toString());
mac.init(new SecretKeySpec(key, algorithm.toString()));
return mac.doFinal(data);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
log.error(message, e);
throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
}
}
public static String calSignature(byte[] data, String key) throws AclException {
return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
}
public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm,
Charset charset) throws AclException {
return signAndBase64Encode(data, key, algorithm, charset);
}
private static String signAndBase64Encode(byte[] data, String key, SigningAlgorithm algorithm, Charset charset)
throws AclException {
try {
byte[] signature = sign(data, key.getBytes(charset), algorithm);
return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
log.error(message, e);
throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.yaml.snakeyaml.Yaml;
import static org.apache.rocketmq.acl.common.SessionCredentials.CHARSET;
public class AclUtils {
public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {
try {
StringBuilder sb = new StringBuilder("");
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
sb.append(entry.getValue());
}
}
return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
} catch (Exception e) {
throw new RuntimeException("incompatible exception.", e);
}
}
public static byte[] combineBytes(byte[] b1, byte[] b2) {
int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
byte[] total = new byte[size];
if (null != b1)
System.arraycopy(b1, 0, total, 0, b1.length);
if (null != b2)
System.arraycopy(b2, 0, total, b1.length, b2.length);
return total;
}
public static String calSignature(byte[] data, String secretKey) {
String signature = AclSigner.calSignature(data, secretKey);
return signature;
}
public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
}
}
public static String[] getAddreeStrArray(String netaddress, String four) {
String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
String address = netaddress.substring(0, netaddress.indexOf("{"));
String[] addreeStrArray = new String[fourStrArray.length];
for (int i = 0; i < fourStrArray.length; i++) {
addreeStrArray[i] = address + fourStrArray[i];
}
return addreeStrArray;
}
public static boolean isScope(String num, int index) {
String[] strArray = StringUtils.split(num, ".");
if (strArray.length != 4) {
return false;
}
return isScope(strArray, index);
}
public static boolean isScope(String[] num, int index) {
if (num.length <= index) {
}
for (int i = 0; i < index; i++) {
if (!isScope(num[i])) {
return false;
}
}
return true;
}
public static boolean isScope(String num) {
return isScope(Integer.valueOf(num.trim()));
}
public static boolean isScope(int num) {
return num >= 0 && num <= 255;
}
public static boolean isAsterisk(String asterisk) {
return asterisk.indexOf('*') > -1;
}
public static boolean isColon(String colon) {
return colon.indexOf(',') > -1;
}
public static boolean isMinus(String minus) {
return minus.indexOf('-') > -1;
}
public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml ymal = new Yaml();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (Exception e) {
throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclException("close transport fileInputStream Exception", e);
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.common.protocol.RequestCode;
public class Permission {
public static final byte DENY = 1;
public static final byte ANY = 1 << 1;
public static final byte PUB = 1 << 2;
public static final byte SUB = 1 << 3;
public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>();
static {
// UPDATE_AND_CREATE_TOPIC
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC);
// UPDATE_BROKER_CONFIG
ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG);
// DELETE_TOPIC_IN_BROKER
ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP);
// DELETE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.DELETE_SUBSCRIPTIONGROUP);
}
public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
if ((ownedPerm & DENY) > 0) {
return false;
}
if ((neededPerm & ANY) > 0) {
return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);
}
return (neededPerm & ownedPerm) > 0;
}
public static byte parsePermFromString(String permString) {
if (permString == null) {
return Permission.DENY;
}
switch (permString.trim()) {
case "PUB":
return Permission.PUB;
case "SUB":
return Permission.SUB;
case "PUB|SUB":
return Permission.PUB | Permission.SUB;
case "SUB|PUB":
return Permission.PUB | Permission.SUB;
case "DENY":
return Permission.DENY;
default:
return Permission.DENY;
}
}
public static void parseResourcePerms(PlainAccessResource plainAccessResource, Boolean isTopic,
List<String> resources) {
if (resources == null || resources.isEmpty()) {
return;
}
for (String resource : resources) {
String[] items = StringUtils.split(resource, "=");
if (items.length == 2) {
plainAccessResource.addResourceAndPerm(isTopic ? items[0].trim() : PlainAccessResource.getRetryTopic(items[0].trim()), parsePermFromString(items[1].trim()));
} else {
throw new AclException(String.format("Parse resource permission failed for %s:%s", isTopic ? "topic" : "group", resource));
}
}
}
public static boolean needAdminPerm(Integer code) {
return ADMIN_CODE.contains(code);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.rocketmq.common.MixAll;
public class SessionCredentials {
public static final Charset CHARSET = Charset.forName("UTF-8");
public static final String ACCESS_KEY = "AccessKey";
public static final String SECRET_KEY = "SecretKey";
public static final String SIGNATURE = "Signature";
public static final String SECURITY_TOKEN = "SecurityToken";
public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",
System.getProperty("user.home") + File.separator + "key");
private String accessKey;
private String secretKey;
private String securityToken;
private String signature;
public SessionCredentials() {
String keyContent = null;
try {
keyContent = MixAll.file2String(KEY_FILE);
} catch (IOException ignore) {
}
if (keyContent != null) {
Properties prop = MixAll.string2Properties(keyContent);
if (prop != null) {
this.updateContent(prop);
}
}
}
public SessionCredentials(String accessKey, String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}
public SessionCredentials(String accessKey, String secretKey, String securityToken) {
this(accessKey, secretKey);
this.securityToken = securityToken;
}
public void updateContent(Properties prop) {
{
String value = prop.getProperty(ACCESS_KEY);
if (value != null) {
this.accessKey = value.trim();
}
}
{
String value = prop.getProperty(SECRET_KEY);
if (value != null) {
this.secretKey = value.trim();
}
}
{
String value = prop.getProperty(SECURITY_TOKEN);
if (value != null) {
this.securityToken = value.trim();
}
}
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
public String getSecurityToken() {
return securityToken;
}
public void setSecurityToken(final String securityToken) {
this.securityToken = securityToken;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((accessKey == null) ? 0 : accessKey.hashCode());
result = prime * result + ((secretKey == null) ? 0 : secretKey.hashCode());
result = prime * result + ((signature == null) ? 0 : signature.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SessionCredentials other = (SessionCredentials) obj;
if (accessKey == null) {
if (other.accessKey != null)
return false;
} else if (!accessKey.equals(other.accessKey))
return false;
if (secretKey == null) {
if (other.secretKey != null)
return false;
} else if (!secretKey.equals(other.secretKey))
return false;
if (signature == null) {
if (other.signature != null)
return false;
} else if (!signature.equals(other.signature))
return false;
return true;
}
@Override
public String toString() {
return "SessionCredentials [accessKey=" + accessKey + ", secretKey=" + secretKey + ", signature="
+ signature + ", SecurityToken=" + securityToken + "]";
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;//package com.aliyun.openservices.ons.api.impl.rocketmq.spas;
public enum SigningAlgorithm {
HmacSHA1,
HmacSHA256,
HmacMD5;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.common.MixAll;
public class PlainAccessResource implements AccessResource {
//identify the user
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private byte defaultTopicPerm = 1;
private byte defaultGroupPerm = 1;
private Map<String, Byte> resourcePermMap;
private RemoteAddressStrategy remoteAddressStrategy;
private int requestCode;
//the content to calculate the content
private byte[] content;
private String signature;
private String secretToken;
private String recognition;
public PlainAccessResource() {
}
public static boolean isRetryTopic(String topic) {
return null != topic && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
public static String printStr(String resource, boolean isGroup) {
if (resource == null) {
return null;
}
if (isGroup) {
return String.format("%s:%s", "group", getGroupFromRetryTopic(resource));
} else {
return String.format("%s:%s", "topic", resource);
}
}
public static String getGroupFromRetryTopic(String retryTopic) {
if (retryTopic == null) {
return null;
}
return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
}
public static String getRetryTopic(String group) {
if (group == null) {
return null;
}
return MixAll.getRetryTopic(group);
}
public void addResourceAndPerm(String resource, byte perm) {
if (resource == null) {
return;
}
if (resourcePermMap == null) {
resourcePermMap = new HashMap<>();
}
resourcePermMap.put(resource, perm);
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public byte getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(byte defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public byte getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(byte defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public Map<String, Byte> getResourcePermMap() {
return resourcePermMap;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
public int getRequestCode() {
return requestCode;
}
public void setRequestCode(int requestCode) {
this.requestCode = requestCode;
}
public String getSecretToken() {
return secretToken;
}
public void setSecretToken(String secretToken) {
this.secretToken = secretToken;
}
public RemoteAddressStrategy getRemoteAddressStrategy() {
return remoteAddressStrategy;
}
public void setRemoteAddressStrategy(RemoteAddressStrategy remoteAddressStrategy) {
this.remoteAddressStrategy = remoteAddressStrategy;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic;
public class PlainAccessValidator implements AccessValidator {
private PlainPermissionLoader aclPlugEngine;
public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionLoader();
}
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
try {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
}
}
break;
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
break;
default:
break;
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);
}
// content
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;
}
@Override
public void validate(AccessResource accessResource) {
aclPlugEngine.validate((PlainAccessResource) accessResource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class PlainPermissionLoader {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
private boolean isWatchStart;
public PlainPermissionLoader() {
initialize();
watch();
}
public void initialize() {
JSONObject accessControlTransport = AclUtils.getYamlDataObject(fileHome + fileName,
JSONObject.class);
if (accessControlTransport == null || accessControlTransport.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + fileName));
}
log.info("BorkerAccessControlTransport data is : ", accessControlTransport.toString());
JSONArray globalWhiteRemoteAddressesList = accessControlTransport.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
addGlobalWhiteRemoteAddress(globalWhiteRemoteAddressesList.getString(i));
}
}
JSONArray accounts = accessControlTransport.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccess : plainAccessList) {
this.addPlainAccessResource(getPlainAccessResource(plainAccess));
}
}
}
private void watch() {
String version = System.getProperty("java.version");
String[] str = StringUtils.split(version, ".");
if (Integer.valueOf(str[1]) < 7) {
log.warn("Watch need jdk equal or greater than 1.7, current version is {}", str[1]);
return;
}
try {
int fileIndex = fileName.lastIndexOf("/") + 1;
String watchDirectory = fileName.substring(0, fileIndex);
final String watchFileName = fileName.substring(fileIndex);
log.info("watch directory is {} , watch directory file name is {} ", fileHome + watchDirectory, watchFileName);
final WatchService watcher = FileSystems.getDefault().newWatchService();
Path p = Paths.get(fileHome + watchDirectory);
p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
ServiceThread watcherServcie = new ServiceThread() {
public void run() {
while (true) {
try {
WatchKey watchKey = watcher.take();
List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
for (WatchEvent<?> event : watchEvents) {
if (watchFileName.equals(event.context().toString())
&& (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())
|| StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
log.info("{} make a difference change is : {}", watchFileName, event.toString());
PlainPermissionLoader.this.clearPermissionInfo();
initialize();
}
}
watchKey.reset();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
UtilAll.sleep(3000);
}
}
}
@Override
public String getServiceName() {
return "AclWatcherService";
}
};
watcherServcie.start();
log.info("Succeed to start AclWatcherService");
this.isWatchStart = true;
} catch (IOException e) {
log.error("Failed to start AclWatcherService", e);
}
}
PlainAccessResource getPlainAccessResource(PlainAccessConfig plainAccess) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccess.getAccessKey());
plainAccessResource.setSecretKey(plainAccess.getSecretKey());
plainAccessResource.setWhiteRemoteAddress(plainAccess.getWhiteRemoteAddress());
plainAccessResource.setAdmin(plainAccess.isAdmin());
plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccess.getDefaultGroupPerm()));
plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccess.getDefaultTopicPerm()));
Permission.parseResourcePerms(plainAccessResource, false, plainAccess.getGroupPerms());
Permission.parseResourcePerms(plainAccessResource, true, plainAccess.getTopicPerms());
return plainAccessResource;
}
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
//if the needCheckedPermMap is null,then return
return;
}
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (!ownedPermMap.containsKey(resource)) {
//Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
}
void clearPermissionInfo() {
this.plainAccessResourceMap.clear();
this.globalWhiteRemoteAddressStrategy.clear();
}
public void addPlainAccessResource(PlainAccessResource plainAccessResource) throws AclException {
if (plainAccessResource.getAccessKey() == null
|| plainAccessResource.getSecretKey() == null
|| plainAccessResource.getAccessKey().length() <= 6
|| plainAccessResource.getSecretKey().length() <= 6) {
throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessResource.getAccessKey(), plainAccessResource.getSecretKey()));
}
try {
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory
.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategy);
if (plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
log.warn("Duplicate acl config for {}, the newly one may overwrite the old", plainAccessResource.getAccessKey());
}
plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource);
} catch (Exception e) {
throw new AclException(String.format("Load plain access resource failed %s %s", e.getMessage(), plainAccessResource.toString()), e);
}
}
private void addGlobalWhiteRemoteAddress(String remoteAddresses) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(remoteAddresses));
}
public void validate(PlainAccessResource plainAccessResource) {
//Step 1, check the global white remote addr
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
if (plainAccessResource.getAccessKey() == null) {
throw new AclException(String.format("No accessKey is configured"));
}
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
}
//Step 2, check the white addr for accesskey
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
//Step 3, check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
//Step 4, check perm of each resource
checkPerm(plainAccessResource, ownedAccess);
}
public boolean isWatchStart() {
return isWatchStart;
}
static class PlainAccessConfig {
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private String defaultTopicPerm;
private String defaultGroupPerm;
private List<String> topicPerms;
private List<String> groupPerms;
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public String getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(String defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public String getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(String defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public List<String> getTopicPerms() {
return topicPerms;
}
public void setTopicPerms(List<String> topicPerms) {
this.topicPerms = topicPerms;
}
public List<String> getGroupPerms() {
return groupPerms;
}
public void setGroupPerms(List<String> groupPerms) {
this.groupPerms = groupPerms;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
public interface RemoteAddressStrategy {
public boolean match(PlainAccessResource plainAccessResource);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class RemoteAddressStrategyFactory {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
public static final NullRemoteAddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullRemoteAddressStrategy();
public static final BlankRemoteAddressStrategy BLANK_NET_ADDRESS_STRATEGY = new BlankRemoteAddressStrategy();
public RemoteAddressStrategy getRemoteAddressStrategy(PlainAccessResource plainAccessResource) {
return getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress());
}
public RemoteAddressStrategy getRemoteAddressStrategy(String remoteAddr) {
if (StringUtils.isBlank(remoteAddr)) {
log.warn("white list address is null");
return BLANK_NET_ADDRESS_STRATEGY;
}
if ("*".equals(remoteAddr)) {
return NULL_NET_ADDRESS_STRATEGY;
}
if (remoteAddr.endsWith("}")) {
String[] strArray = StringUtils.split(remoteAddr, ".");
String four = strArray[3];
if (!four.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
}
return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, four));
} else if (AclUtils.isColon(remoteAddr)) {
return new MultipleRemoteAddressStrategy(StringUtils.split(remoteAddr, ","));
} else if (AclUtils.isAsterisk(remoteAddr) || AclUtils.isMinus(remoteAddr)) {
return new RangeRemoteAddressStrategy(remoteAddr);
}
return new OneRemoteAddressStrategy(remoteAddr);
}
public static class NullRemoteAddressStrategy implements RemoteAddressStrategy {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return true;
}
}
public static class BlankRemoteAddressStrategy implements RemoteAddressStrategy {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return false;
}
}
public static class MultipleRemoteAddressStrategy implements RemoteAddressStrategy {
private final Set<String> multipleSet = new HashSet<>();
public MultipleRemoteAddressStrategy(String[] strArray) {
for (String netaddress : strArray) {
AclUtils.verify(netaddress, 4);
multipleSet.add(netaddress);
}
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return multipleSet.contains(plainAccessResource.getWhiteRemoteAddress());
}
}
public static class OneRemoteAddressStrategy implements RemoteAddressStrategy {
private String netaddress;
public OneRemoteAddressStrategy(String netaddress) {
this.netaddress = netaddress;
AclUtils.verify(netaddress, 4);
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return netaddress.equals(plainAccessResource.getWhiteRemoteAddress());
}
}
public static class RangeRemoteAddressStrategy implements RemoteAddressStrategy {
private String head;
private int start;
private int end;
private int index;
public RangeRemoteAddressStrategy(String remoteAddr) {
String[] strArray = StringUtils.split(remoteAddr, ".");
if (analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(remoteAddr, index - 1);
StringBuffer sb = new StringBuffer().append(strArray[0].trim()).append(".").append(strArray[1].trim()).append(".");
if (index == 3) {
sb.append(strArray[2].trim()).append(".");
}
this.head = sb.toString();
}
}
private boolean analysis(String[] strArray, int index) {
String value = strArray[index].trim();
this.index = index;
if ("*".equals(value)) {
setValue(0, 255);
} else if (AclUtils.isMinus(value)) {
if (value.indexOf("-") == 0) {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception value %s ", value));
}
String[] valueArray = StringUtils.split(value, "-");
this.start = Integer.valueOf(valueArray[0]);
this.end = Integer.valueOf(valueArray[1]);
if (!(AclUtils.isScope(end) && AclUtils.isScope(start) && start <= end)) {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
}
}
return this.end > 0 ? true : false;
}
private void setValue(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
String netAddress = plainAccessResource.getWhiteRemoteAddress();
if (netAddress.startsWith(this.head)) {
String value;
if (index == 3) {
value = netAddress.substring(this.head.length());
} else {
value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
}
Integer address = Integer.valueOf(value);
if (address >= this.start && address <= this.end) {
return true;
}
}
return false;
}
}
}
package org.apache.rocketmq.acl.common;
import org.junit.Test;
public class AclSignerTest {
@Test(expected = Exception.class)
public void calSignatureExceptionTest(){
AclSigner.calSignature(new byte[]{},"");
}
@Test
public void calSignatureTest(){
AclSigner.calSignature("RocketMQ","12345678");
AclSigner.calSignature("RocketMQ".getBytes(),"12345678");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
public class AclUtilsTest {
@Test
public void getAddreeStrArray() {
String address = "1.1.1.{1,2,3,4}";
String[] addressArray = AclUtils.getAddreeStrArray(address, "{1,2,3,4}");
List<String> newAddressList = new ArrayList<>();
for (String a : addressArray) {
newAddressList.add(a);
}
List<String> addressList = new ArrayList<>();
addressList.add("1.1.1.1");
addressList.add("1.1.1.2");
addressList.add("1.1.1.3");
addressList.add("1.1.1.4");
Assert.assertEquals(newAddressList, addressList);
}
@Test
public void isScopeStringArray() {
String adderss = "12";
for (int i = 0; i < 6; i++) {
boolean isScope = AclUtils.isScope(adderss, 4);
if (i == 3) {
Assert.assertTrue(isScope);
} else {
Assert.assertFalse(isScope);
}
adderss = adderss + ".12";
}
}
@Test
public void isScopeArray() {
String[] adderss = StringUtils.split("12.12.12.12", ".");
boolean isScope = AclUtils.isScope(adderss, 4);
Assert.assertTrue(isScope);
isScope = AclUtils.isScope(adderss, 3);
Assert.assertTrue(isScope);
adderss = StringUtils.split("12.12.1222.1222", ".");
isScope = AclUtils.isScope(adderss, 4);
Assert.assertFalse(isScope);
isScope = AclUtils.isScope(adderss, 3);
Assert.assertFalse(isScope);
}
@Test
public void isScopeStringTest() {
for (int i = 0; i < 256; i++) {
boolean isScope = AclUtils.isScope(i + "");
Assert.assertTrue(isScope);
}
boolean isScope = AclUtils.isScope("-1");
Assert.assertFalse(isScope);
isScope = AclUtils.isScope("256");
Assert.assertFalse(isScope);
}
@Test
public void isScopeTest() {
for (int i = 0; i < 256; i++) {
boolean isScope = AclUtils.isScope(i);
Assert.assertTrue(isScope);
}
boolean isScope = AclUtils.isScope(-1);
Assert.assertFalse(isScope);
isScope = AclUtils.isScope(256);
Assert.assertFalse(isScope);
}
@Test
public void isAsteriskTest() {
boolean isAsterisk = AclUtils.isAsterisk("*");
Assert.assertTrue(isAsterisk);
isAsterisk = AclUtils.isAsterisk(",");
Assert.assertFalse(isAsterisk);
}
@Test
public void isColonTest() {
boolean isColon = AclUtils.isColon(",");
Assert.assertTrue(isColon);
isColon = AclUtils.isColon("-");
Assert.assertFalse(isColon);
}
@Test
public void isMinusTest() {
boolean isMinus = AclUtils.isMinus("-");
Assert.assertTrue(isMinus);
isMinus = AclUtils.isMinus("*");
Assert.assertFalse(isMinus);
}
@SuppressWarnings("unchecked")
@Test
public void getYamlDataObjectTest() {
Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class);
Assert.assertFalse(map.isEmpty());
}
@Test(expected = Exception.class)
public void getYamlDataObjectExceptionTest() {
AclUtils.getYamlDataObject("plain_acl.yml", Map.class);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.junit.Assert;
import org.junit.Test;
public class PermissionTest {
@Test
public void fromStringGetPermissionTest() {
byte perm = Permission.parsePermFromString("PUB");
Assert.assertEquals(perm, Permission.PUB);
perm = Permission.parsePermFromString("SUB");
Assert.assertEquals(perm, Permission.SUB);
perm = Permission.parsePermFromString("PUB|SUB");
Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
perm = Permission.parsePermFromString("SUB|PUB");
Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
perm = Permission.parsePermFromString("DENY");
Assert.assertEquals(perm, Permission.DENY);
perm = Permission.parsePermFromString("1");
Assert.assertEquals(perm, Permission.DENY);
perm = Permission.parsePermFromString(null);
Assert.assertEquals(perm, Permission.DENY);
}
@Test
public void checkPermissionTest() {
boolean boo = Permission.checkPermission(Permission.DENY, Permission.DENY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.PUB, Permission.PUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.SUB, Permission.SUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.PUB, (byte) (Permission.PUB|Permission.SUB));
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.SUB, (byte) (Permission.PUB|Permission.SUB));
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, (byte) (Permission.PUB|Permission.SUB));
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.SUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.PUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.ANY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.PUB);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.SUB);
Assert.assertFalse(boo);
}
@Test(expected = AclException.class)
public void setTopicPermTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Permission.parseResourcePerms(plainAccessResource, false, null);
Assert.assertNull(resourcePermMap);
List<String> groups = new ArrayList<>();
Permission.parseResourcePerms(plainAccessResource, false, groups);
Assert.assertNull(resourcePermMap);
groups.add("groupA=DENY");
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
Permission.parseResourcePerms(plainAccessResource, false, groups);
resourcePermMap = plainAccessResource.getResourcePermMap();
byte perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA"));
Assert.assertEquals(perm, Permission.DENY);
perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB"));
Assert.assertEquals(perm,Permission.PUB|Permission.SUB);
perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC"));
Assert.assertEquals(perm, Permission.PUB);
List<String> topics = new ArrayList<>();
topics.add("topicA=DENY");
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
Permission.parseResourcePerms(plainAccessResource, true, topics);
perm = resourcePermMap.get("topicA");
Assert.assertEquals(perm, Permission.DENY);
perm = resourcePermMap.get("topicB");
Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
perm = resourcePermMap.get("topicC");
Assert.assertEquals(perm, Permission.PUB);
List<String> erron = new ArrayList<>();
erron.add("");
Permission.parseResourcePerms(plainAccessResource, false, erron);
}
@Test
public void checkAdminCodeTest() {
Set<Integer> code = new HashSet<>();
code.add(17);
code.add(25);
code.add(215);
code.add(200);
code.add(207);
for (int i = 0; i < 400; i++) {
boolean boo = Permission.needAdminPerm(i);
if (boo) {
Assert.assertTrue(code.contains(i));
}
}
}
}
package org.apache.rocketmq.acl.common;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class SessionCredentialsTest {
@Test
public void equalsTest(){
SessionCredentials sessionCredentials=new SessionCredentials("RocketMQ","12345678");
sessionCredentials.setSecurityToken("abcd");
SessionCredentials other=new SessionCredentials("RocketMQ","12345678","abcd");
Assert.assertTrue(sessionCredentials.equals(other));
}
@Test
public void updateContentTest(){
SessionCredentials sessionCredentials=new SessionCredentials();
Properties properties=new Properties();
properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredentials.updateContent(properties);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.*;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PlainAccessValidatorTest {
private PlainAccessValidator plainAccessValidator;
private AclClientRPCHook aclClient;
private SessionCredentials sessionCredentials;
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("12345678");
sessionCredentials.setSecurityToken("87654321");
aclClient = new AclClientRPCHook(sessionCredentials);
}
@Test
public void contentTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1");
String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
Assert.assertEquals(accessResource.getSignature(), signature);
}
@Test
public void validateTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateSendMessageTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateSendMessageV2Test() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validatePullMessageTest() {
PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicC");
pullMessageRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateConsumeMessageBackTest() {
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader=new ConsumerSendMsgBackRequestHeader();
consumerSendMsgBackRequestHeader.setOriginTopic("topicC");
consumerSendMsgBackRequestHeader.setGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,consumerSendMsgBackRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateQueryMessageTest() {
QueryMessageRequestHeader queryMessageRequestHeader=new QueryMessageRequestHeader();
queryMessageRequestHeader.setTopic("topicC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE,queryMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateHeartBeatTest() {
HeartbeatData heartbeatData=new HeartbeatData();
Set<ProducerData> producerDataSet=new HashSet<>();
Set<ConsumerData> consumerDataSet=new HashSet<>();
Set<SubscriptionData> subscriptionDataSet=new HashSet<>();
ProducerData producerData=new ProducerData();
producerData.setGroupName("producerGroupA");
ConsumerData consumerData=new ConsumerData();
consumerData.setGroupName("consumerGroupA");
SubscriptionData subscriptionData=new SubscriptionData();
subscriptionData.setTopic("topicC");
producerDataSet.add(producerData);
consumerDataSet.add(consumerData);
subscriptionDataSet.add(subscriptionData);
consumerData.setSubscriptionDataSet(subscriptionDataSet);
heartbeatData.setProducerDataSet(producerDataSet);
heartbeatData.setConsumerDataSet(consumerDataSet);
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null);
remotingCommand.setBody(heartbeatData.encode());
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encode();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateUnRegisterClientTest() {
UnregisterClientRequestHeader unregisterClientRequestHeader=new UnregisterClientRequestHeader();
unregisterClientRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT,unregisterClientRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateGetConsumerListByGroupTest() {
GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader=new GetConsumerListByGroupRequestHeader();
getConsumerListByGroupRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP,getConsumerListByGroupRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
@Test
public void validateUpdateConsumerOffSetTest() {
UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader=new UpdateConsumerOffsetRequestHeader();
updateConsumerOffsetRequestHeader.setConsumerGroup("consumerGroupA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,updateConsumerOffsetRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PlainPermissionLoaderTest {
PlainPermissionLoader plainPermissionLoader;
PlainAccessResource PUBPlainAccessResource;
PlainAccessResource SUBPlainAccessResource;
PlainAccessResource ANYPlainAccessResource;
PlainAccessResource DENYPlainAccessResource;
PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
Set<Integer> adminCode = new HashSet<>();
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
// UPDATE_AND_CREATE_TOPIC
adminCode.add(17);
// UPDATE_BROKER_CONFIG
adminCode.add(25);
// DELETE_TOPIC_IN_BROKER
adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200);
// DELETE_SUBSCRIPTIONGROUP
adminCode.add(207);
PUBPlainAccessResource = clonePlainAccessResource(Permission.PUB);
SUBPlainAccessResource = clonePlainAccessResource(Permission.SUB);
ANYPlainAccessResource = clonePlainAccessResource(Permission.ANY);
DENYPlainAccessResource = clonePlainAccessResource(Permission.DENY);
System.setProperty("java.version", "1.6.11");
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("romcketmq.acl.plain.fileName", "/conf/plain_acl.yml");
plainPermissionLoader = new PlainPermissionLoader();
}
public PlainAccessResource clonePlainAccessResource(byte perm) {
PlainAccessResource painAccessResource = new PlainAccessResource();
painAccessResource.setAccessKey("RocketMQ");
painAccessResource.setSecretKey("12345678");
painAccessResource.setWhiteRemoteAddress("127.0." + perm + ".*");
painAccessResource.setDefaultGroupPerm(perm);
painAccessResource.setDefaultTopicPerm(perm);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupA"), Permission.PUB);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupB"), Permission.SUB);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupC"), Permission.ANY);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupD"), Permission.DENY);
painAccessResource.addResourceAndPerm("topicA", Permission.PUB);
painAccessResource.addResourceAndPerm("topicB", Permission.SUB);
painAccessResource.addResourceAndPerm("topicC", Permission.ANY);
painAccessResource.addResourceAndPerm("topicD", Permission.DENY);
return painAccessResource;
}
@Test
public void getPlainAccessResourceTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessConfig plainAccess = new PlainAccessConfig();
plainAccess.setAccessKey("RocketMQ");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
plainAccess.setSecretKey("12345678");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
plainAccess.setWhiteRemoteAddress("127.0.0.1");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
plainAccess.setAdmin(true);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.isAdmin(), true);
List<String> groups = new ArrayList<String>();
groups.add("groupA=DENY");
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
plainAccess.setGroupPerms(groups);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB);
List<String> topics = new ArrayList<String>();
topics.add("topicA=DENY");
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
plainAccess.setTopicPerms(topics);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 6);
Assert.assertEquals(resourcePermMap.get("topicA").byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get("topicC").byteValue(), Permission.PUB);
}
@Test(expected = AclException.class)
public void checkPermAdmin() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRequestCode(17);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
}
@Test
public void checkPerm() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
}
@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void accountThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void passWordtNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void passWordThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
new PlainPermissionLoader().initialize();
}
@SuppressWarnings("unchecked")
@Test
public void cleanAuthenticationInfoTest() throws IllegalAccessException {
//plainPermissionLoader.addPlainAccessResource(plainAccessResource);
Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
Assert.assertFalse(plainAccessResourceMap.isEmpty());
plainPermissionLoader.clearPermissionInfo();
plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
Assert.assertTrue(plainAccessResourceMap.isEmpty());
}
@Test
public void isWatchStartTest() {
System.setProperty("java.version", "1.7.11");
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart());
System.setProperty("java.version", "1.6.11");
plainPermissionLoader = new PlainPermissionLoader();
Assert.assertFalse(plainPermissionLoader.isWatchStart());
}
@SuppressWarnings("unchecked")
@Test
public void watchTest() throws IOException, IllegalAccessException {
System.setProperty("java.version", "1.7.11");
System.setProperty("rocketmq.home.dir", "src/test/resources/watch");
File file = new File("src/test/resources/watch/conf");
file.mkdirs();
File transport = new File("src/test/resources/watch/conf/plain_acl.yml");
transport.delete();
transport.createNewFile();
FileWriter writer = new FileWriter(transport);
writer.write("accounts:\r\n");
writer.write("- accessKey: rokcetmq\r\n");
writer.write(" secretKey: aliyun11\r\n");
writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
writer.write(" admin: true\r\n");
writer.flush();
writer.close();
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
Assert.assertNotNull(plainAccessResourceMap.get("rokcetmq"));
writer = new FileWriter(new File("src/test/resources/watch/conf/plain_acl.yml"), true);
writer.write("- accessKey: rokcet1\r\n");
writer.write(" secretKey: aliyun1\r\n");
writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
writer.write(" admin: true\r\n");
writer.flush();
writer.close();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
Assert.assertNotNull(plainAccessResourceMap.get("rokcet1"));
}
@Test(expected = AclException.class)
public void initializeTest() {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
new PlainPermissionLoader();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import org.apache.rocketmq.acl.common.AclException;
import org.junit.Assert;
import org.junit.Test;
public class RemoteAddressStrategyTest {
RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
@Test
public void netaddressStrategyFactoryExceptionTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource).getClass(),
RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
}
@Test
public void netaddressStrategyFactoryTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("*");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.OneRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("127.0.1-20.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
}
@Test(expected = AclException.class)
public void verifyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("256.0.0.1");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
@Test
public void nullNetaddressStrategyTest() {
boolean isMatch = RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY.match(new PlainAccessResource());
Assert.assertTrue(isMatch);
}
@Test
public void blankNetaddressStrategyTest() {
boolean isMatch = RemoteAddressStrategyFactory.BLANK_NET_ADDRESS_STRATEGY.match(new PlainAccessResource());
Assert.assertFalse(isMatch);
}
public void oneNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
}
@Test
public void multipleNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
}
@Test(expected = AclException.class)
public void multipleNetaddressStrategyExceptionTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,2,3}");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
private void multipleNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.3");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.4");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("127.0.0.0");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
}
@Test
public void rangeNetaddressStrategyTest() {
String head = "127.0.0.";
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 1, 200, true);
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 0, 255, true);
plainAccessResource.setWhiteRemoteAddress("127.0.1-200.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
}
private void rangeNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
int end,
boolean isFalse) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
for (int i = -10; i < 300; i++) {
plainAccessResource.setWhiteRemoteAddress(head + i);
boolean match = remoteAddressStrategy.match(plainAccessResource);
if (isFalse && i >= start && i <= end) {
Assert.assertTrue(match);
continue;
}
Assert.assertFalse(match);
}
}
private void rangeNetaddressStrategyThirdlyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
int end) {
String newHead;
for (int i = -10; i < 300; i++) {
newHead = head + i;
if (i >= start && i <= end) {
rangeNetaddressStrategyTest(remoteAddressStrategy, newHead, 0, 255, false);
}
}
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionScopeTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.-1-200");
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionScopeTwoTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.0-256");
}
private void rangeNetaddressStrategyExceptionTest(String netaddress) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress(netaddress);
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: aliyun.com
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -52,6 +48,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
......@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -91,6 +92,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
......@@ -157,6 +159,7 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
......@@ -467,6 +470,8 @@ public class BrokerController {
}
}
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
......@@ -486,6 +491,47 @@ public class BrokerController {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
private void initialAcl() {
if (!this.brokerConfig.isEnableAcl()) {
log.info("The broker dose not enable acl");
return;
}
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The broker dose not load the AccessValidator");
return;
}
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
validator.validate(validator.parse(request, remoteAddr));
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
}
private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook: rpcHooks) {
this.registerServerRPCHook(rpcHook);
}
}
public void registerProcessor() {
/**
* SendMessageProcessor
......@@ -989,6 +1035,7 @@ public class BrokerController {
public void registerServerRPCHook(RPCHook rpcHook) {
getRemotingServer().registerRPCHook(rpcHook);
this.fastRemotingServer.registerRPCHook(rpcHook);
}
public RemotingServer getRemotingServer() {
......@@ -1049,7 +1096,9 @@ public class BrokerController {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
}
......@@ -34,6 +34,14 @@ public class ServiceProvider {
public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
public static final String RPC_HOOK_ID = "META-INF/service/org.apache.rocketmq.remoting.RPCHook";
public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";
static {
thisClassLoader = getClassLoader(ServiceProvider.class);
}
......
org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
......@@ -17,12 +17,15 @@
package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
public class ServiceProviderTest {
@Test
......@@ -38,4 +41,10 @@ public class ServiceProviderTest {
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
@Test
public void loadAccessValidatorTest() {
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
assertThat(accessValidators).isNotNull();
}
}
org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
......@@ -174,6 +174,23 @@ public class BrokerConfig {
@ImportantField
private long transactionCheckInterval = 60 * 1000;
/**
* Acl feature switch
*/
@ImportantField
private boolean enableAcl = false;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public boolean isTraceOn() {
return traceOn;
}
......@@ -238,16 +255,6 @@ public class BrokerConfig {
this.slaveReadEnable = slaveReadEnable;
}
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
......@@ -712,6 +719,14 @@ public class BrokerConfig {
this.transactionCheckInterval = transactionCheckInterval;
}
public boolean isEnableAcl() {
return enableAcl;
}
public void setEnableAcl(boolean isAclPlug) {
this.enableAcl = isAclPlug;
}
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
......
......@@ -60,6 +60,18 @@ public class UtilAll {
}
}
public static void sleep(long sleepMs) {
if (sleepMs < 0) {
return;
}
try {
Thread.sleep(sleepMs);
} catch (Throwable ignored) {
}
}
public static String currentStackTrace() {
StringBuilder sb = new StringBuilder();
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
......
......@@ -37,4 +37,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
public static final String ACL_PLUG_LOGGER_NAME = "RocketmqAclPlug";
}
......@@ -17,14 +17,13 @@
package org.apache.rocketmq.common;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
......@@ -20,3 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
enableAcl=true
namesrvAddr=127.0.0.1:9876
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- accessKey: aliyun.com
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
accessKey: aliyun.com
secretKey: 12345678
......@@ -53,5 +53,10 @@
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
*
* 1. view the /conf/plain_acl.yml file under the distribution module, pay attention to the accessKey,secretKey,
* globalWhiteRemoteAddresses and whiteRemoteAddress and some other attributes.
*
* 2. Modify ACL_ACCESS_KEY and ACL_SECRET_KEY to the corresponding accessKey and secretKey in plain_acl.yml
*
*/
public class AclClient {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "1234567";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
pushConsumer();
pullConsumer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
public static void pullConsumer() throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
printBody(pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void printBody(PullResult pullResult) {
printBody(pullResult.getMsgFoundList());
}
private static void printBody(List<MessageExt> msg) {
if (msg == null || msg.size() == 0)
return;
for (MessageExt m : msg) {
if (m != null) {
System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
}
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
......@@ -29,10 +29,10 @@ public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
......
......@@ -16,7 +16,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache</groupId>
......@@ -125,6 +126,7 @@
<module>distribution</module>
<module>openmessaging</module>
<module>logging</module>
<module>acl</module>
</modules>
<build>
......@@ -157,7 +159,7 @@
</executions>
<configuration>
<rules>
<banCircularDependencies />
<banCircularDependencies/>
</rules>
<fail>true</fail>
</configuration>
......@@ -214,9 +216,9 @@
<execution>
<id>generate-effective-dependencies-pom</id>
<phase>generate-resources</phase>
<goals>
<!-- <goals>
<goal>effective-pom</goal>
</goals>
</goals> -->
<configuration>
<output>${project.build.directory}/effective-pom/effective-dependencies.xml</output>
</configuration>
......@@ -521,6 +523,11 @@
<artifactId>rocketmq-example</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......@@ -581,6 +588,16 @@
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
......
......@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
......@@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -45,8 +48,6 @@ import org.apache.rocketmq.remoting.common.ServiceThread;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
......@@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract {
*/
protected volatile SslContext sslContext;
/**
* custom rpc hooks
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
static {
NettyLogger.initNettyLogger();
}
......@@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract {
}
}
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
}
protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doAfterResponse(addr, request, response);
}
}
}
/**
* Process incoming request command issued by remote peer.
*
......@@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
......@@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract {
}
}
/**
* Custom RPC hook.
* Just be compatible with the previous version, use getRPCHooks instead.
*/
@Deprecated
protected RPCHook getRPCHook() {
if (rpcHooks.size() > 0) {
return rpcHooks.get(0);
}
return null;
}
/**
* Custom RPC hooks.
*
* @return RPC hook if specified; null otherwise.
* @return RPC hooks if specified; null otherwise.
*/
public abstract RPCHook getRPCHook();
public List<RPCHook> getRPCHooks() {
return rpcHooks;
}
/**
* This method specifies thread pool to use while invoking callback methods.
......
......@@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -64,8 +66,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
......@@ -94,7 +94,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
......@@ -283,7 +282,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
public void closeChannel(final Channel channel) {
......@@ -357,6 +358,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
......@@ -364,17 +367,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
......@@ -522,9 +521,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
......@@ -547,9 +544,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
......@@ -592,10 +587,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
......
......@@ -47,6 +47,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -58,8 +60,6 @@ import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
......@@ -75,7 +75,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
private int port = 0;
......@@ -266,7 +265,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
@Override
......@@ -318,10 +319,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
......
......@@ -19,9 +19,13 @@ package org.apache.rocketmq.test.base;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
......@@ -48,6 +52,7 @@ public class BaseConf {
private static Logger log = Logger.getLogger(BaseConf.class);
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
......
......@@ -15,7 +15,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -36,6 +37,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-store</artifactId>
......@@ -60,5 +65,9 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
</dependencies>
</project>
......@@ -19,13 +19,16 @@ package org.apache.rocketmq.tools.command;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -129,7 +132,7 @@ public class MQAdminStartup {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, rpcHook);
cmd.execute(commandLine, options, getAclRPCHook(commandLine));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
......@@ -211,7 +214,6 @@ public class MQAdminStartup {
private static void printHelp() {
System.out.printf("The most commonly used mqadmin commands are:%n");
for (SubCommand cmd : subCommandList) {
System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc());
}
......@@ -243,4 +245,25 @@ public class MQAdminStartup {
public static void initCommand(SubCommand command) {
subCommandList.add(command);
}
public static RPCHook getAclRPCHook(CommandLine commandLine) {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
String fileName = "/conf/tools.yml";
JSONObject yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName ,
JSONObject.class);
if (yamlDataObject == null || yamlDataObject.isEmpty()) {
System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName);
return null;
}
// admin ak sk
String accessKey = yamlDataObject.getString("accessKey");
String secretKey = yamlDataObject.getString("secretKey");
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
System.out.printf("AccessKey or secretKey is blank, the acl is not enabled.%n");
return null;
}
return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册