提交 23a24c44 编写于 作者: L laohu

Seamless cloud

上级 87d85991
......@@ -8,8 +8,7 @@
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
......@@ -50,5 +49,17 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.lang.reflect.Field;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.acl.common.SessionCredentials.AccessKey;
import static org.apache.rocketmq.acl.common.SessionCredentials.SecurityToken;
import static org.apache.rocketmq.acl.common.SessionCredentials.Signature;
public class AclClientRPCHook implements RPCHook {
private final SessionCredentials sessionCredentials;
protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
private final SessionCredentials sessionCredentials;
public AclClientRPCHook(SessionCredentials sessionCredentials) {
this.sessionCredentials = sessionCredentials;
}
......@@ -37,7 +50,6 @@ public class AclClientRPCHook implements RPCHook {
}
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
public class AclException extends RuntimeException {
......@@ -6,27 +22,31 @@ public class AclException extends RuntimeException {
private String status;
private int code;
public AclException(String status, int code) {
super();
this.status = status;
this.code = code;
}
public AclException(String status, int code, String message) {
super(message);
this.status = status;
this.code = code;
}
public AclException(String status, int code, Throwable throwable) {
super(throwable);
this.status = status;
this.code = code;
}
public AclException(String message) {
super(message);
}
public AclException(String message, Throwable throwable) {
super(message, throwable);
}
public AclException(String status, int code, String message, Throwable throwable) {
super(message, throwable);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.nio.charset.Charset;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class AclSigner {
public static final Charset defaultCharset = Charset.forName("UTF-8");
......@@ -20,7 +35,8 @@ public class AclSigner {
return calSignature(data, key, defaultAlgorithm, defaultCharset);
}
public static String calSignature(String data, String key, SigningAlgorithm algorithm, Charset charset) throws AclException {
public static String calSignature(String data, String key, SigningAlgorithm algorithm,
Charset charset) throws AclException {
return signAndBase64Encode(data, key, algorithm, charset);
}
......@@ -52,7 +68,8 @@ public class AclSigner {
return calSignature(data, key, defaultAlgorithm, defaultCharset);
}
public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm, Charset charset) throws AclException {
public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm,
Charset charset) throws AclException {
return signAndBase64Encode(data, key, algorithm, charset);
}
......
......@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plain.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.yaml.snakeyaml.Yaml;
......@@ -45,7 +44,6 @@ public class AclUtils {
}
}
public static byte[] combineBytes(byte[] b1, byte[] b2) {
int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
byte[] total = new byte[size];
......@@ -56,7 +54,6 @@ public class AclUtils {
return total;
}
public static String calSignature(byte[] data, String secretKey) {
String signature = AclSigner.calSignature(data, secretKey);
return signature;
......@@ -64,7 +61,7 @@ public class AclUtils {
public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
throw new AclPlugRuntimeException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
}
}
......@@ -128,15 +125,16 @@ public class AclUtils {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("The transport.yml file for Plain mode was not found , paths %s", path), e);
throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclPlugRuntimeException("close transport fileInputStream Exception", e);
throw new AclException("close transport fileInputStream Exception", e);
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONArray;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
public class Permission {
public static final byte DENY = 1;
......@@ -7,7 +29,22 @@ public class Permission {
public static final byte PUB = 1 << 2;
public static final byte SUB = 1 << 3;
public boolean checkPermission(byte neededPerm, byte ownedPerm) {
public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>();
static {
// UPDATE_AND_CREATE_TOPIC
ADMIN_CODE.add(17);
// UPDATE_BROKER_CONFIG
ADMIN_CODE.add(25);
// DELETE_TOPIC_IN_BROKER
ADMIN_CODE.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(200);
// DELETE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(207);
}
public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
if ((ownedPerm & DENY) > 0) {
return false;
}
......@@ -17,4 +54,43 @@ public class Permission {
return (neededPerm & ownedPerm) > 0;
}
public static byte fromStringGetPermission(String permString) {
if (permString == null) {
return Permission.DENY;
}
switch (permString.trim()) {
case "PUB":
return Permission.PUB;
case "SUB":
return Permission.SUB;
case "ANY":
return Permission.ANY;
case "PUB|SUB":
return Permission.ANY;
case "SUB|PUB":
return Permission.ANY;
case "DENY":
return Permission.DENY;
default:
return Permission.DENY;
}
}
public static void setTopicPerm(PlainAccessResource plainAccessResource, Boolean isTopic, JSONArray topicArray) {
if (topicArray == null || topicArray.isEmpty()) {
return;
}
for (int i = 0; i < topicArray.size(); i++) {
String[] topicPrem = StringUtils.split(topicArray.getString(i), "=");
if (topicPrem.length == 2) {
plainAccessResource.addResourceAndPerm(isTopic ? topicPrem[0] : PlainAccessResource.getRetryTopic(topicPrem[0]), fromStringGetPermission(topicPrem[1]));
} else {
throw new AclException(String.format("%s Permission config erron %s", isTopic ? "topic" : "group", topicArray.getString(i)));
}
}
}
public static boolean checkAdminCode(Integer code) {
return ADMIN_CODE.contains(code);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import org.apache.rocketmq.common.MixAll;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.rocketmq.common.MixAll;
public class SessionCredentials {
public static final Charset CHARSET = Charset.forName("UTF-8");
......@@ -45,7 +61,6 @@ public class SessionCredentials {
this.securityToken = securityToken;
}
public void updateContent(Properties prop) {
{
String value = prop.getProperty(AccessKey);
......@@ -99,8 +114,6 @@ public class SessionCredentials {
this.securityToken = securityToken;
}
@Override
public int hashCode() {
final int prime = 31;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;//package com.aliyun.openservices.ons.api.impl.rocketmq.spas;
public enum SigningAlgorithm {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
/**
* Use AclException instead
*/
@Deprecated
public class AclPlugRuntimeException extends RuntimeException {
private static final long serialVersionUID = 6062101368637228900L;
public AclPlugRuntimeException(String message) {
super(message);
}
public AclPlugRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@Deprecated
public class AuthenticationInfo {
private PlainAccessResource plainAccessResource;
private RemoteAddressStrategy remoteAddressStrategy;
private Map<Integer, Boolean> authority;
public AuthenticationInfo(Map<Integer, Boolean> authority, PlainAccessResource plainAccessResource,
RemoteAddressStrategy remoteAddressStrategy) {
super();
this.authority = authority;
this.plainAccessResource = plainAccessResource;
this.remoteAddressStrategy = remoteAddressStrategy;
}
public PlainAccessResource getPlainAccessResource() {
return plainAccessResource;
}
public void setPlainAccessResource(PlainAccessResource plainAccessResource) {
this.plainAccessResource = plainAccessResource;
}
public RemoteAddressStrategy getRemoteAddressStrategy() {
return remoteAddressStrategy;
}
public void setRemoteAddressStrategy(RemoteAddressStrategy remoteAddressStrategy) {
this.remoteAddressStrategy = remoteAddressStrategy;
}
public Map<Integer, Boolean> getAuthority() {
return authority;
}
public void setAuthority(Map<Integer, Boolean> authority) {
this.authority = authority;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticationInfo [plainAccessResource=").append(plainAccessResource).append(", remoteAddressStrategy=")
.append(remoteAddressStrategy).append(", authority={");
Iterator<Entry<Integer, Boolean>> it = authority.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
if (!e.getValue()) {
builder.append(e.getKey().toString()).append(":").append(e.getValue()).append(",");
}
}
builder.append("}]");
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
@Deprecated
public class AuthenticationResult {
private PlainAccessResource plainAccessResource;
private boolean succeed;
private Exception exception;
private String resultString;
public PlainAccessResource getPlainAccessResource() {
return plainAccessResource;
}
public void setPlainAccessResource(PlainAccessResource plainAccessResource) {
this.plainAccessResource = plainAccessResource;
}
public boolean isSucceed() {
return succeed;
}
public void setSucceed(boolean succeed) {
this.succeed = succeed;
}
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.util.HashSet;
import java.util.Set;
@Deprecated
public class BrokerAccessControl extends PlainAccessResource {
private boolean admin;
private Set<String> permitSendTopic = new HashSet<>();
private Set<String> noPermitSendTopic = new HashSet<>();
private Set<String> permitPullTopic = new HashSet<>();
private Set<String> noPermitPullTopic = new HashSet<>();
private boolean sendMessage = true;
private boolean sendMessageV2 = true;
private boolean sendBatchMessage = true;
private boolean consumerSendMsgBack = true;
private boolean pullMessage = true;
private boolean queryMessage = true;
private boolean viewMessageById = true;
private boolean heartBeat = true;
private boolean unregisterClient = true;
private boolean checkClientConfig = true;
private boolean getConsumerListByGroup = true;
private boolean updateConsumerOffset = true;
private boolean queryConsumerOffset = true;
private boolean endTransaction = true;
private boolean updateAndCreateTopic = false;
private boolean deleteTopicInbroker = false;
private boolean getAllTopicConfig = true;
private boolean updateBrokerConfig = false;
private boolean getBrokerConfig = true;
private boolean searchOffsetByTimestamp = true;
private boolean getMaxOffset = true;
private boolean getMinOffset = true;
private boolean getEarliestMsgStoretime = true;
private boolean getBrokerRuntimeInfo = true;
private boolean lockBatchMQ = true;
private boolean unlockBatchMQ = true;
private boolean updateAndCreateSubscriptiongroup = false;
private boolean getAllSubscriptiongroupConfig = true;
private boolean deleteSubscriptiongroup = false;
private boolean getTopicStatsInfo = true;
private boolean getConsumerConnectionList = true;
private boolean getProducerConnectionList = true;
private boolean getConsumeStats = true;
private boolean getAllConsumerOffset = true;
private boolean getAllDelayOffset = true;
private boolean invokeBrokerToresetOffset = true;
private boolean queryTopicConsumeByWho = true;
private boolean registerFilterServer = true;
private boolean queryConsumeTimeSpan = true;
private boolean getSystemTopicListFromBroker = true;
private boolean cleanExpiredConsumequeue = true;
private boolean cleanUnusedTopic = true;
private boolean getConsumerRunningInfo = true;
private boolean queryCorrectionOffset = true;
private boolean consumeMessageDirectly = true;
private boolean cloneGroupOffset = true;
private boolean viewBrokerStatsData = true;
private boolean getBrokerConsumeStats = true;
private boolean queryConsumeQueue = true;
public BrokerAccessControl() {
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public Set<String> getPermitSendTopic() {
return permitSendTopic;
}
public void setPermitSendTopic(Set<String> permitSendTopic) {
this.permitSendTopic = permitSendTopic;
}
public Set<String> getNoPermitSendTopic() {
return noPermitSendTopic;
}
public void setNoPermitSendTopic(Set<String> noPermitSendTopic) {
this.noPermitSendTopic = noPermitSendTopic;
}
public Set<String> getPermitPullTopic() {
return permitPullTopic;
}
public void setPermitPullTopic(Set<String> permitPullTopic) {
this.permitPullTopic = permitPullTopic;
}
public Set<String> getNoPermitPullTopic() {
return noPermitPullTopic;
}
public void setNoPermitPullTopic(Set<String> noPermitPullTopic) {
this.noPermitPullTopic = noPermitPullTopic;
}
public boolean isSendMessage() {
return sendMessage;
}
public void setSendMessage(boolean sendMessage) {
this.sendMessage = sendMessage;
}
public boolean isSendMessageV2() {
return sendMessageV2;
}
public void setSendMessageV2(boolean sendMessageV2) {
this.sendMessageV2 = sendMessageV2;
}
public boolean isSendBatchMessage() {
return sendBatchMessage;
}
public void setSendBatchMessage(boolean sendBatchMessage) {
this.sendBatchMessage = sendBatchMessage;
}
public boolean isConsumerSendMsgBack() {
return consumerSendMsgBack;
}
public void setConsumerSendMsgBack(boolean consumerSendMsgBack) {
this.consumerSendMsgBack = consumerSendMsgBack;
}
public boolean isPullMessage() {
return pullMessage;
}
public void setPullMessage(boolean pullMessage) {
this.pullMessage = pullMessage;
}
public boolean isQueryMessage() {
return queryMessage;
}
public void setQueryMessage(boolean queryMessage) {
this.queryMessage = queryMessage;
}
public boolean isViewMessageById() {
return viewMessageById;
}
public void setViewMessageById(boolean viewMessageById) {
this.viewMessageById = viewMessageById;
}
public boolean isHeartBeat() {
return heartBeat;
}
public void setHeartBeat(boolean heartBeat) {
this.heartBeat = heartBeat;
}
public boolean isUnregisterClient() {
return unregisterClient;
}
public void setUnregisterClient(boolean unregisterClient) {
this.unregisterClient = unregisterClient;
}
public boolean isCheckClientConfig() {
return checkClientConfig;
}
public void setCheckClientConfig(boolean checkClientConfig) {
this.checkClientConfig = checkClientConfig;
}
public boolean isGetConsumerListByGroup() {
return getConsumerListByGroup;
}
public void setGetConsumerListByGroup(boolean getConsumerListByGroup) {
this.getConsumerListByGroup = getConsumerListByGroup;
}
public boolean isUpdateConsumerOffset() {
return updateConsumerOffset;
}
public void setUpdateConsumerOffset(boolean updateConsumerOffset) {
this.updateConsumerOffset = updateConsumerOffset;
}
public boolean isQueryConsumerOffset() {
return queryConsumerOffset;
}
public void setQueryConsumerOffset(boolean queryConsumerOffset) {
this.queryConsumerOffset = queryConsumerOffset;
}
public boolean isEndTransaction() {
return endTransaction;
}
public void setEndTransaction(boolean endTransaction) {
this.endTransaction = endTransaction;
}
public boolean isUpdateAndCreateTopic() {
return updateAndCreateTopic;
}
public void setUpdateAndCreateTopic(boolean updateAndCreateTopic) {
this.updateAndCreateTopic = updateAndCreateTopic;
}
public boolean isDeleteTopicInbroker() {
return deleteTopicInbroker;
}
public void setDeleteTopicInbroker(boolean deleteTopicInbroker) {
this.deleteTopicInbroker = deleteTopicInbroker;
}
public boolean isGetAllTopicConfig() {
return getAllTopicConfig;
}
public void setGetAllTopicConfig(boolean getAllTopicConfig) {
this.getAllTopicConfig = getAllTopicConfig;
}
public boolean isUpdateBrokerConfig() {
return updateBrokerConfig;
}
public void setUpdateBrokerConfig(boolean updateBrokerConfig) {
this.updateBrokerConfig = updateBrokerConfig;
}
public boolean isGetBrokerConfig() {
return getBrokerConfig;
}
public void setGetBrokerConfig(boolean getBrokerConfig) {
this.getBrokerConfig = getBrokerConfig;
}
public boolean isSearchOffsetByTimestamp() {
return searchOffsetByTimestamp;
}
public void setSearchOffsetByTimestamp(boolean searchOffsetByTimestamp) {
this.searchOffsetByTimestamp = searchOffsetByTimestamp;
}
public boolean isGetMaxOffset() {
return getMaxOffset;
}
public void setGetMaxOffset(boolean getMinOffset) {
this.getMaxOffset = getMinOffset;
}
public boolean isGetMinOffset() {
return getMinOffset;
}
public void setGetMinOffset(boolean getMinOffset) {
this.getMinOffset = getMinOffset;
}
public boolean isGetEarliestMsgStoretime() {
return getEarliestMsgStoretime;
}
public void setGetEarliestMsgStoretime(boolean getEarliestMsgStoretime) {
this.getEarliestMsgStoretime = getEarliestMsgStoretime;
}
public boolean isGetBrokerRuntimeInfo() {
return getBrokerRuntimeInfo;
}
public void setGetBrokerRuntimeInfo(boolean getBrokerRuntimeInfo) {
this.getBrokerRuntimeInfo = getBrokerRuntimeInfo;
}
public boolean isLockBatchMQ() {
return lockBatchMQ;
}
public void setLockBatchMQ(boolean lockBatchMQ) {
this.lockBatchMQ = lockBatchMQ;
}
public boolean isUnlockBatchMQ() {
return unlockBatchMQ;
}
public void setUnlockBatchMQ(boolean unlockBatchMQ) {
this.unlockBatchMQ = unlockBatchMQ;
}
public boolean isUpdateAndCreateSubscriptiongroup() {
return updateAndCreateSubscriptiongroup;
}
public void setUpdateAndCreateSubscriptiongroup(boolean updateAndCreateSubscriptiongroup) {
this.updateAndCreateSubscriptiongroup = updateAndCreateSubscriptiongroup;
}
public boolean isGetAllSubscriptiongroupConfig() {
return getAllSubscriptiongroupConfig;
}
public void setGetAllSubscriptiongroupConfig(boolean getAllSubscriptiongroupConfig) {
this.getAllSubscriptiongroupConfig = getAllSubscriptiongroupConfig;
}
public boolean isDeleteSubscriptiongroup() {
return deleteSubscriptiongroup;
}
public void setDeleteSubscriptiongroup(boolean deleteSubscriptiongroup) {
this.deleteSubscriptiongroup = deleteSubscriptiongroup;
}
public boolean isGetTopicStatsInfo() {
return getTopicStatsInfo;
}
public void setGetTopicStatsInfo(boolean getTopicStatsInfo) {
this.getTopicStatsInfo = getTopicStatsInfo;
}
public boolean isGetConsumerConnectionList() {
return getConsumerConnectionList;
}
public void setGetConsumerConnectionList(boolean getConsumerConnectionList) {
this.getConsumerConnectionList = getConsumerConnectionList;
}
public boolean isGetProducerConnectionList() {
return getProducerConnectionList;
}
public void setGetProducerConnectionList(boolean getProducerConnectionList) {
this.getProducerConnectionList = getProducerConnectionList;
}
public boolean isGetConsumeStats() {
return getConsumeStats;
}
public void setGetConsumeStats(boolean getConsumeStats) {
this.getConsumeStats = getConsumeStats;
}
public boolean isGetAllConsumerOffset() {
return getAllConsumerOffset;
}
public void setGetAllConsumerOffset(boolean getAllConsumerOffset) {
this.getAllConsumerOffset = getAllConsumerOffset;
}
public boolean isGetAllDelayOffset() {
return getAllDelayOffset;
}
public void setGetAllDelayOffset(boolean getAllDelayOffset) {
this.getAllDelayOffset = getAllDelayOffset;
}
public boolean isInvokeBrokerToresetOffset() {
return invokeBrokerToresetOffset;
}
public void setInvokeBrokerToresetOffset(boolean invokeBrokerToresetOffset) {
this.invokeBrokerToresetOffset = invokeBrokerToresetOffset;
}
public boolean isQueryTopicConsumeByWho() {
return queryTopicConsumeByWho;
}
public void setQueryTopicConsumeByWho(boolean queryTopicConsumeByWho) {
this.queryTopicConsumeByWho = queryTopicConsumeByWho;
}
public boolean isRegisterFilterServer() {
return registerFilterServer;
}
public void setRegisterFilterServer(boolean registerFilterServer) {
this.registerFilterServer = registerFilterServer;
}
public boolean isQueryConsumeTimeSpan() {
return queryConsumeTimeSpan;
}
public void setQueryConsumeTimeSpan(boolean queryConsumeTimeSpan) {
this.queryConsumeTimeSpan = queryConsumeTimeSpan;
}
public boolean isGetSystemTopicListFromBroker() {
return getSystemTopicListFromBroker;
}
public void setGetSystemTopicListFromBroker(boolean getSystemTopicListFromBroker) {
this.getSystemTopicListFromBroker = getSystemTopicListFromBroker;
}
public boolean isCleanExpiredConsumequeue() {
return cleanExpiredConsumequeue;
}
public void setCleanExpiredConsumequeue(boolean cleanExpiredConsumequeue) {
this.cleanExpiredConsumequeue = cleanExpiredConsumequeue;
}
public boolean isCleanUnusedTopic() {
return cleanUnusedTopic;
}
public void setCleanUnusedTopic(boolean cleanUnusedTopic) {
this.cleanUnusedTopic = cleanUnusedTopic;
}
public boolean isGetConsumerRunningInfo() {
return getConsumerRunningInfo;
}
public void setGetConsumerRunningInfo(boolean getConsumerRunningInfo) {
this.getConsumerRunningInfo = getConsumerRunningInfo;
}
public boolean isQueryCorrectionOffset() {
return queryCorrectionOffset;
}
public void setQueryCorrectionOffset(boolean queryCorrectionOffset) {
this.queryCorrectionOffset = queryCorrectionOffset;
}
public boolean isConsumeMessageDirectly() {
return consumeMessageDirectly;
}
public void setConsumeMessageDirectly(boolean consumeMessageDirectly) {
this.consumeMessageDirectly = consumeMessageDirectly;
}
public boolean isCloneGroupOffset() {
return cloneGroupOffset;
}
public void setCloneGroupOffset(boolean cloneGroupOffset) {
this.cloneGroupOffset = cloneGroupOffset;
}
public boolean isViewBrokerStatsData() {
return viewBrokerStatsData;
}
public void setViewBrokerStatsData(boolean viewBrokerStatsData) {
this.viewBrokerStatsData = viewBrokerStatsData;
}
public boolean isGetBrokerConsumeStats() {
return getBrokerConsumeStats;
}
public void setGetBrokerConsumeStats(boolean getBrokerConsumeStats) {
this.getBrokerConsumeStats = getBrokerConsumeStats;
}
public boolean isQueryConsumeQueue() {
return queryConsumeQueue;
}
public void setQueryConsumeQueue(boolean queryConsumeQueue) {
this.queryConsumeQueue = queryConsumeQueue;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("BorkerAccessControl [permitSendTopic=").append(permitSendTopic).append(", noPermitSendTopic=")
.append(noPermitSendTopic).append(", permitPullTopic=").append(permitPullTopic)
.append(", noPermitPullTopic=").append(noPermitPullTopic);
if (!!sendMessage)
builder.append(", sendMessage=").append(sendMessage);
if (!!sendMessageV2)
builder.append(", sendMessageV2=").append(sendMessageV2);
if (!sendBatchMessage)
builder.append(", sendBatchMessage=").append(sendBatchMessage);
if (!consumerSendMsgBack)
builder.append(", consumerSendMsgBack=").append(consumerSendMsgBack);
if (!pullMessage)
builder.append(", pullMessage=").append(pullMessage);
if (!queryMessage)
builder.append(", queryMessage=").append(queryMessage);
if (!viewMessageById)
builder.append(", viewMessageById=").append(viewMessageById);
if (!heartBeat)
builder.append(", heartBeat=").append(heartBeat);
if (!unregisterClient)
builder.append(", unregisterClient=").append(unregisterClient);
if (!checkClientConfig)
builder.append(", checkClientConfig=").append(checkClientConfig);
if (!getConsumerListByGroup)
builder.append(", getConsumerListByGroup=").append(getConsumerListByGroup);
if (!updateConsumerOffset)
builder.append(", updateConsumerOffset=").append(updateConsumerOffset);
if (!queryConsumerOffset)
builder.append(", queryConsumerOffset=").append(queryConsumerOffset);
if (!endTransaction)
builder.append(", endTransaction=").append(endTransaction);
if (!updateAndCreateTopic)
builder.append(", updateAndCreateTopic=").append(updateAndCreateTopic);
if (!deleteTopicInbroker)
builder.append(", deleteTopicInbroker=").append(deleteTopicInbroker);
if (!getAllTopicConfig)
builder.append(", getAllTopicConfig=").append(getAllTopicConfig);
if (!updateBrokerConfig)
builder.append(", updateBrokerConfig=").append(updateBrokerConfig);
if (!getBrokerConfig)
builder.append(", getBrokerConfig=").append(getBrokerConfig);
if (!searchOffsetByTimestamp)
builder.append(", searchOffsetByTimestamp=").append(searchOffsetByTimestamp);
if (!getMaxOffset)
builder.append(", getMaxOffset=").append(getMaxOffset);
if (!getMinOffset)
builder.append(", getMixOffset=").append(getMinOffset);
if (!getEarliestMsgStoretime)
builder.append(", getEarliestMsgStoretime=").append(getEarliestMsgStoretime);
if (!getBrokerRuntimeInfo)
builder.append(", getBrokerRuntimeInfo=").append(getBrokerRuntimeInfo);
if (!lockBatchMQ)
builder.append(", lockBatchMQ=").append(lockBatchMQ);
if (!unlockBatchMQ)
builder.append(", unlockBatchMQ=").append(unlockBatchMQ);
if (!updateAndCreateSubscriptiongroup)
builder.append(", updateAndCreateSubscriptiongroup=").append(updateAndCreateSubscriptiongroup);
if (!getAllSubscriptiongroupConfig)
builder.append(", getAllSubscriptiongroupConfig=").append(getAllSubscriptiongroupConfig);
if (!deleteSubscriptiongroup)
builder.append(", deleteSubscriptiongroup=").append(deleteSubscriptiongroup);
if (!getTopicStatsInfo)
builder.append(", getTopicStatsInfo=").append(getTopicStatsInfo);
if (!getConsumerConnectionList)
builder.append(", getConsumerConnectionList=").append(getConsumerConnectionList);
if (!getProducerConnectionList)
builder.append(", getProducerConnectionList=").append(getProducerConnectionList);
if (!getConsumeStats)
builder.append(", getConsumeStats=").append(getConsumeStats);
if (!getAllConsumerOffset)
builder.append(", getAllConsumerOffset=").append(getAllConsumerOffset);
if (!getAllDelayOffset)
builder.append(", getAllDelayOffset=").append(getAllDelayOffset);
if (!invokeBrokerToresetOffset)
builder.append(", invokeBrokerToresetOffset=").append(invokeBrokerToresetOffset);
if (!queryTopicConsumeByWho)
builder.append(", queryTopicConsumeByWho=").append(queryTopicConsumeByWho);
if (!registerFilterServer)
builder.append(", registerFilterServer=").append(registerFilterServer);
if (!queryConsumeTimeSpan)
builder.append(", queryConsumeTimeSpan=").append(queryConsumeTimeSpan);
if (!getSystemTopicListFromBroker)
builder.append(", getSystemTopicListFromBroker=").append(getSystemTopicListFromBroker);
if (!cleanExpiredConsumequeue)
builder.append(", cleanExpiredConsumequeue=").append(cleanExpiredConsumequeue);
if (!getConsumerRunningInfo)
builder.append(", cleanUnusedTopic=").append(getConsumerRunningInfo);
if (!getConsumerRunningInfo)
builder.append(", getConsumerRunningInfo=").append(getConsumerRunningInfo);
if (!queryCorrectionOffset)
builder.append(", queryCorrectionOffset=").append(queryCorrectionOffset);
if (!consumeMessageDirectly)
builder.append(", consumeMessageDirectly=").append(consumeMessageDirectly);
if (!cloneGroupOffset)
builder.append(", cloneGroupOffset=").append(cloneGroupOffset);
if (!viewBrokerStatsData)
builder.append(", viewBrokerStatsData=").append(viewBrokerStatsData);
if (!getBrokerConsumeStats)
builder.append(", getBrokerConsumeStats=").append(getBrokerConsumeStats);
if (!queryConsumeQueue)
builder.append(", queryConsumeQueue=").append(queryConsumeQueue);
builder.append("]");
return builder.toString();
}
}
......@@ -23,34 +23,56 @@ import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.common.MixAll;
public class PlainAccessResource implements AccessResource {
//identify the user
private String accessKey;
private String signature;
//the content to calculate the content
private byte[] content;
private String secretKey;
private String secretToken;
private String whiteRemoteAddress;
private Map<String, Byte> resourcePermMap = new HashMap<>();
private boolean admin;
private String remoteAddr;
private byte defaultTopicPerm = 1;
private String recognition;
private byte defaultGroupPerm = 1;
private Map<String, Byte> resourcePermMap;
private RemoteAddressStrategy remoteAddressStrategy;
private int requestCode;
//the content to calculate the content
private byte[] content;
private String signature;
private String secretToken;
@Deprecated
private String topic;
private String recognition;
public PlainAccessResource() {
}
public static boolean isRetryTopic(String topic) {
return (null != topic && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX));
}
public static String getRetryTopic(String group) {
if (group == null) {
return null;
}
return MixAll.getRetryTopic(group);
}
public void addResourceAndPerm(String resource, byte perm) {
if (resource == null) {
return;
}
if (resourcePermMap == null) {
resourcePermMap = new HashMap<>();
}
resourcePermMap.put(resource, perm);
}
......@@ -62,20 +84,48 @@ public class PlainAccessResource implements AccessResource {
this.accessKey = accessKey;
}
public String getSignature() {
return signature;
public String getSecretKey() {
return secretKey;
}
public void setSignature(String signature) {
this.signature = signature;
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public String getRemoteAddr() {
return remoteAddr;
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public void setRemoteAddr(String remoteAddr) {
this.remoteAddr = remoteAddr;
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public byte getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(byte defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public byte getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(byte defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public Map<String, Byte> getResourcePermMap() {
return resourcePermMap;
}
public String getRecognition() {
......@@ -94,14 +144,6 @@ public class PlainAccessResource implements AccessResource {
this.requestCode = requestCode;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getSecretToken() {
return secretToken;
}
......@@ -110,21 +152,25 @@ public class PlainAccessResource implements AccessResource {
this.secretToken = secretToken;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
public RemoteAddressStrategy getRemoteAddressStrategy() {
return remoteAddressStrategy;
}
public void setRemoteAddressStrategy(RemoteAddressStrategy remoteAddressStrategy) {
this.remoteAddressStrategy = remoteAddressStrategy;
}
public static boolean isRetryTopic(String topic) {
return (null != topic && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX));
public String getSignature() {
return signature;
}
public static String getRetryTopic(String group) {
if (group == null) {
return null;
public void setSignature(String signature) {
this.signature = signature;
}
return MixAll.getRetryTopic(group);
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
public byte[] getContent() {
......
......@@ -21,8 +21,8 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -47,7 +47,7 @@ public class PlainAccessValidator implements AccessValidator {
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
accessResource.setRemoteAddr(remoteAddr);
accessResource.setWhiteRemoteAddress(remoteAddr);
accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.AccessKey));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.Signature));
......@@ -77,7 +77,7 @@ public class PlainAccessValidator implements AccessValidator {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
for (SubscriptionData subscriptionData: data.getSubscriptionDataSet()) {
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
}
}
......@@ -106,10 +106,8 @@ public class PlainAccessValidator implements AccessValidator {
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), -1, t);
throw new AclException(t.getMessage(), t);
}
// content
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
......@@ -118,26 +116,12 @@ public class PlainAccessValidator implements AccessValidator {
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;
}
@Override
public void validate(AccessResource accessResource) {
AuthenticationResult authenticationResult = null;
try {
authenticationResult = aclPlugEngine.eachCheckAuthentication((PlainAccessResource) accessResource);
if (authenticationResult.isSucceed())
return;
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e);
}
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
}
if (authenticationResult.getPlainAccessResource() != null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
aclPlugEngine.eachCheckPlainAccessResource((PlainAccessResource) accessResource);
}
}
......@@ -16,8 +16,9 @@
*/
package org.apache.rocketmq.acl.plain;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
......@@ -32,11 +33,12 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -47,15 +49,14 @@ public class PlainPermissionLoader {
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Map<String/** account **/, List<AuthenticationInfo>> accessControlMap = new HashMap<>();
private String fileName = System.getProperty("romcketmq.acl.plain.fileName", "/conf/transport.yml");
private AuthenticationInfo authenticationInfo;
private Map<String/** account **/
, List<PlainAccessResource>> plainAccessResourceMap = new HashMap<>();
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private Class<?> accessContralAnalysisClass = RequestCode.class;
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
private boolean isWatchStart;
......@@ -65,13 +66,26 @@ public class PlainPermissionLoader {
}
public void initialize() {
BrokerAccessControlTransport accessControlTransport = AclUtils.getYamlDataObject(fileHome + "/conf/transport.yml", BrokerAccessControlTransport.class);
if (accessControlTransport == null) {
throw new AclPlugRuntimeException("transport.yml file is no data");
JSONObject accessControlTransport = AclUtils.getYamlDataObject(fileHome + fileName,
JSONObject.class);
if (accessControlTransport == null || accessControlTransport.isEmpty()) {
throw new AclException("transport.yml file is not data");
}
log.info("BorkerAccessControlTransport data is : ", accessControlTransport.toString());
accessContralAnalysis.analysisClass(accessContralAnalysisClass);
setBrokerAccessControlTransport(accessControlTransport);
JSONArray globalWhiteRemoteAddressesList = accessControlTransport.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
setGlobalWhite(globalWhiteRemoteAddressesList.getString(i));
}
}
JSONArray accounts = accessControlTransport.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
for (int i = 0; i < accounts.size(); i++) {
this.setPlainAccessResource(getPlainAccessResource(accounts.getJSONObject(i)));
}
}
}
private void watch() {
......@@ -95,8 +109,9 @@ public class PlainPermissionLoader {
WatchKey watchKey = watcher.take();
List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
for (WatchEvent<?> event : watchEvents) {
if ("transport.yml".equals(event.context().toString()) &&
(StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
if ("transport.yml".equals(event.context().toString())
&& (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())
|| StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
log.info("transprot.yml make a difference change is : ", event.toString());
PlainPermissionLoader.this.cleanAuthenticationInfo();
initialize();
......@@ -124,234 +139,115 @@ public class PlainPermissionLoader {
}
}
private void handleAccessControl(PlainAccessResource plainAccessResource) {
if (plainAccessResource instanceof BrokerAccessControl) {
BrokerAccessControl brokerAccessControl = (BrokerAccessControl) plainAccessResource;
if (brokerAccessControl.isAdmin()) {
brokerAccessControl.setUpdateAndCreateSubscriptiongroup(true);
brokerAccessControl.setDeleteSubscriptiongroup(true);
brokerAccessControl.setUpdateAndCreateTopic(true);
brokerAccessControl.setDeleteTopicInbroker(true);
brokerAccessControl.setUpdateBrokerConfig(true);
PlainAccessResource getPlainAccessResource(JSONObject account) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(account.getString("accessKey"));
plainAccessResource.setSecretKey(account.getString("secretKey"));
plainAccessResource.setWhiteRemoteAddress(account.getString("whiteRemoteAddress"));
plainAccessResource.setAdmin(account.containsKey("admin") ? account.getBoolean("admin") : false);
plainAccessResource.setDefaultGroupPerm(Permission.fromStringGetPermission(account.getString("defaultGroupPerm")));
plainAccessResource.setDefaultTopicPerm(Permission.fromStringGetPermission(account.getString("defaultTopicPerm")));
Permission.setTopicPerm(plainAccessResource, true, account.getJSONArray("groups"));
Permission.setTopicPerm(plainAccessResource, true, account.getJSONArray("topics"));
return plainAccessResource;
}
void checkPerm(PlainAccessResource needCheckplainAccessResource, PlainAccessResource plainAccessResource) {
if (!plainAccessResource.isAdmin() && Permission.checkAdminCode(needCheckplainAccessResource.getRequestCode())) {
throw new AclException(String.format("accessKey is %s remoteAddress is %s , is not admin Premission . RequestCode is %d", plainAccessResource.getAccessKey(), plainAccessResource.getWhiteRemoteAddress(), needCheckplainAccessResource.getRequestCode()));
}
Map<String, Byte> needCheckTopicAndGourpPerm = needCheckplainAccessResource.getResourcePermMap();
Map<String, Byte> topicAndGourpPerm = plainAccessResource.getResourcePermMap();
Iterator<Entry<String, Byte>> it = topicAndGourpPerm.entrySet().iterator();
Byte perm;
while (it.hasNext()) {
Entry<String, Byte> e = it.next();
if ((perm = needCheckTopicAndGourpPerm.get(e.getKey())) != null && Permission.checkPermission(perm, e.getValue())) {
continue;
}
byte neededPerm = PlainAccessResource.isRetryTopic(e.getKey()) ? needCheckplainAccessResource.getDefaultGroupPerm() :
needCheckplainAccessResource.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, e.getValue())) {
throw new AclException(String.format("", e.toString()));
}
}
}
void cleanAuthenticationInfo() {
accessControlMap.clear();
authenticationInfo = null;
this.plainAccessResourceMap.clear();
this.globalWhiteRemoteAddressStrategy.clear();
}
public void setAccessControl(PlainAccessResource plainAccessResource) throws AclPlugRuntimeException {
if (plainAccessResource.getAccessKey() == null || plainAccessResource.getSignature() == null
|| plainAccessResource.getAccessKey().length() <= 6 || plainAccessResource.getSignature().length() <= 6) {
throw new AclPlugRuntimeException(String.format(
public void setPlainAccessResource(PlainAccessResource plainAccessResource) throws AclException {
if (plainAccessResource.getAccessKey() == null || plainAccessResource.getSecretKey() == null
|| plainAccessResource.getAccessKey().length() <= 6
|| plainAccessResource.getSecretKey().length() <= 6) {
throw new AclException(String.format(
"The account password cannot be null and is longer than 6, account is %s password is %s",
plainAccessResource.getAccessKey(), plainAccessResource.getSignature()));
plainAccessResource.getAccessKey(), plainAccessResource.getSecretKey()));
}
try {
handleAccessControl(plainAccessResource);
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(plainAccessResource.getAccessKey());
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory
.getNetaddressStrategy(plainAccessResource);
List<PlainAccessResource> accessControlAddressList = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (accessControlAddressList == null) {
accessControlAddressList = new ArrayList<>();
accessControlMap.put(plainAccessResource.getAccessKey(), accessControlAddressList);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(), accessControlAddressList);
}
AuthenticationInfo authenticationInfo = new AuthenticationInfo(
accessContralAnalysis.analysis(plainAccessResource), plainAccessResource, remoteAddressStrategy);
accessControlAddressList.add(authenticationInfo);
log.info("authenticationInfo is {}", authenticationInfo.toString());
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategy);
accessControlAddressList.add(plainAccessResource);
log.info("authenticationInfo is {}", plainAccessResource.toString());
} catch (Exception e) {
throw new AclPlugRuntimeException(
throw new AclException(
String.format("Exception info %s %s", e.getMessage(), plainAccessResource.toString()), e);
}
}
public void setAccessControlList(List<PlainAccessResource> plainAccessResourceList) throws AclPlugRuntimeException {
for (PlainAccessResource plainAccessResource : plainAccessResourceList) {
setAccessControl(plainAccessResource);
}
}
public void setNetaddressAccessControl(PlainAccessResource plainAccessResource) throws AclPlugRuntimeException {
try {
authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(plainAccessResource), plainAccessResource, remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource));
log.info("default authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) {
throw new AclPlugRuntimeException(plainAccessResource.toString(), e);
private void setGlobalWhite(String remoteAddresses) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getNetaddressStrategy(remoteAddresses));
}
}
public void eachCheckPlainAccessResource(PlainAccessResource plainAccessResource) {
public AuthenticationInfo getAccessControl(PlainAccessResource plainAccessResource) {
if (plainAccessResource.getAccessKey() == null && authenticationInfo != null) {
return authenticationInfo.getRemoteAddressStrategy().match(plainAccessResource) ? authenticationInfo : null;
} else {
List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(plainAccessResource.getAccessKey());
if (accessControlAddressList != null) {
for (AuthenticationInfo ai : accessControlAddressList) {
if (ai.getRemoteAddressStrategy().match(plainAccessResource) && ai.getPlainAccessResource().getSignature().equals(plainAccessResource.getSignature())) {
return ai;
}
}
}
}
return null;
List<PlainAccessResource> plainAccessResourceAddressList = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
boolean isDistinguishAccessKey = false;
if (plainAccessResourceAddressList != null) {
for (PlainAccessResource plainAccess : plainAccessResourceAddressList) {
if (!plainAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
isDistinguishAccessKey = true;
continue;
}
public AuthenticationResult eachCheckAuthentication(PlainAccessResource plainAccessResource) {
AuthenticationResult authenticationResult = new AuthenticationResult();
AuthenticationInfo authenticationInfo = getAccessControl(plainAccessResource);
if (authenticationInfo != null) {
boolean boo = authentication(authenticationInfo, plainAccessResource, authenticationResult);
authenticationResult.setSucceed(boo);
authenticationResult.setPlainAccessResource(authenticationInfo.getPlainAccessResource());
String signature = AclUtils.calSignature(plainAccessResource.getContent(), plainAccess.getSecretKey());
if (signature.equals(plainAccessResource.getSignature())) {
checkPerm(plainAccess, plainAccessResource);
return;
} else {
authenticationResult.setResultString("plainAccessResource is null, Please check login, password, IP\"");
}
return authenticationResult;
}
void setBrokerAccessControlTransport(BrokerAccessControlTransport transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty");
}
if (transport.getOnlyNetAddress() != null) {
this.setNetaddressAccessControl(transport.getOnlyNetAddress());
}
if (transport.getList() != null || transport.getList().size() > 0) {
for (BrokerAccessControl accessControl : transport.getList()) {
this.setAccessControl(accessControl);
throw new AclException(String.format("signature is erron. erron accessKe is %s , erron reomiteAddress %s", plainAccess.getAccessKey(), plainAccessResource.getWhiteRemoteAddress()));
}
}
}
public boolean authentication(AuthenticationInfo authenticationInfo, PlainAccessResource plainAccessResource,
AuthenticationResult authenticationResult) {
int code = plainAccessResource.getRequestCode();
if (!authenticationInfo.getAuthority().get(code)) {
authenticationResult.setResultString(String.format("code is %d Authentication failed", code));
return false;
}
if (!(authenticationInfo.getPlainAccessResource() instanceof BrokerAccessControl)) {
return true;
}
BrokerAccessControl borker = (BrokerAccessControl) authenticationInfo.getPlainAccessResource();
String topicName = plainAccessResource.getTopic();
if (code == 10 || code == 310 || code == 320) {
if (borker.getPermitSendTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitSendTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName));
return false;
if (plainAccessResource.getAccessKey() == null && !globalWhiteRemoteAddressStrategy.isEmpty()) {
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
return borker.getPermitSendTopic().isEmpty() ? true : false;
} else if (code == 11) {
if (borker.getPermitPullTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitPullTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName));
return false;
}
return borker.getPermitPullTopic().isEmpty() ? true : false;
if (isDistinguishAccessKey) {
throw new AclException(String.format("client ip not in WhiteRemoteAddress . erron accessKe is %s , erron reomiteAddress %s", plainAccessResource.getAccessKey(), plainAccessResource.getWhiteRemoteAddress()));
} else {
throw new AclException(String.format("It is not make Access and make client ip .erron accessKe is %s , erron reomiteAddress %s", plainAccessResource.getAccessKey(), plainAccessResource.getWhiteRemoteAddress()));
}
return true;
}
public boolean isWatchStart() {
return isWatchStart;
}
public static class AccessContralAnalysis {
private Map<Class<?>, Map<Integer, Field>> classTocodeAndMentod = new HashMap<>();
private Map<String, Integer> fieldNameAndCode = new HashMap<>();
public void analysisClass(Class<?> clazz) {
Field[] fields = clazz.getDeclaredFields();
try {
for (Field field : fields) {
if (field.getType().equals(int.class)) {
String name = StringUtils.replace(field.getName(), "_", "").toLowerCase();
fieldNameAndCode.put(name, (Integer) field.get(null));
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(String.format("analysis on failure Class is %s", clazz.getName()), e);
}
}
public Map<Integer, Boolean> analysis(PlainAccessResource plainAccessResource) {
Class<? extends PlainAccessResource> clazz = plainAccessResource.getClass();
Map<Integer, Field> codeAndField = classTocodeAndMentod.get(clazz);
if (codeAndField == null) {
codeAndField = new HashMap<>();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if ("admin".equals(field.getName()))
continue;
if (!field.getType().equals(boolean.class))
continue;
Integer code = fieldNameAndCode.get(field.getName().toLowerCase());
if (code == null) {
throw new AclPlugRuntimeException(
String.format("field nonexistent in code fieldName is %s", field.getName()));
}
field.setAccessible(true);
codeAndField.put(code, field);
}
if (codeAndField.isEmpty()) {
throw new AclPlugRuntimeException(String.format("PlainAccessResource nonexistent code , name %s",
plainAccessResource.getClass().getName()));
}
classTocodeAndMentod.put(clazz, codeAndField);
}
Iterator<Entry<Integer, Field>> it = codeAndField.entrySet().iterator();
Map<Integer, Boolean> authority = new HashMap<>();
try {
while (it.hasNext()) {
Entry<Integer, Field> e = it.next();
authority.put(e.getKey(), (Boolean) e.getValue().get(plainAccessResource));
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new AclPlugRuntimeException(
String.format("analysis on failure PlainAccessResource is %s", PlainAccessResource.class.getName()), e);
}
return authority;
}
}
public static class BrokerAccessControlTransport {
private BrokerAccessControl onlyNetAddress;
private List<BrokerAccessControl> list;
public BrokerAccessControl getOnlyNetAddress() {
return onlyNetAddress;
}
public void setOnlyNetAddress(BrokerAccessControl onlyNetAddress) {
this.onlyNetAddress = onlyNetAddress;
}
public List<BrokerAccessControl> getList() {
return list;
}
public void setList(List<BrokerAccessControl> list) {
this.list = list;
}
@Override
public String toString() {
return "BorkerAccessControlTransport [onlyNetAddress=" + onlyNetAddress + ", list=" + list + "]";
}
}
}
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.acl.plain;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
public class RemoteAddressStrategyFactory {
......@@ -26,7 +27,11 @@ public class RemoteAddressStrategyFactory {
public static final NullRemoteAddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullRemoteAddressStrategy();
public RemoteAddressStrategy getNetaddressStrategy(PlainAccessResource plainAccessResource) {
String netaddress = plainAccessResource.getRemoteAddr();
return getNetaddressStrategy(plainAccessResource.getWhiteRemoteAddress());
}
public RemoteAddressStrategy getNetaddressStrategy(String netaddress) {
if (StringUtils.isBlank(netaddress) || "*".equals(netaddress)) {
return NULL_NET_ADDRESS_STRATEGY;
}
......@@ -34,7 +39,7 @@ public class RemoteAddressStrategyFactory {
String[] strArray = StringUtils.split(netaddress, ".");
String four = strArray[3];
if (!four.startsWith("{")) {
throw new AclPlugRuntimeException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", netaddress));
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", netaddress));
}
return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(netaddress, four));
} else if (AclUtils.isColon(netaddress)) {
......@@ -67,7 +72,7 @@ public class RemoteAddressStrategyFactory {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return multipleSet.contains(plainAccessResource.getRemoteAddr());
return multipleSet.contains(plainAccessResource.getWhiteRemoteAddress());
}
}
......@@ -83,7 +88,7 @@ public class RemoteAddressStrategyFactory {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return netaddress.equals(plainAccessResource.getRemoteAddr());
return netaddress.equals(plainAccessResource.getWhiteRemoteAddress());
}
}
......@@ -117,14 +122,14 @@ public class RemoteAddressStrategyFactory {
setValue(0, 255);
} else if (AclUtils.isMinus(value)) {
if (value.indexOf("-") == 0) {
throw new AclPlugRuntimeException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception value %s ", value));
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception value %s ", value));
}
String[] valueArray = StringUtils.split(value, "-");
this.start = Integer.valueOf(valueArray[0]);
this.end = Integer.valueOf(valueArray[1]);
if (!(AclUtils.isScope(end) && AclUtils.isScope(start) && start <= end)) {
throw new AclPlugRuntimeException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
}
}
return this.end > 0 ? true : false;
......@@ -137,7 +142,7 @@ public class RemoteAddressStrategyFactory {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
String netAddress = plainAccessResource.getRemoteAddr();
String netAddress = plainAccessResource.getWhiteRemoteAddress();
if (netAddress.startsWith(this.head)) {
String value;
if (index == 3) {
......
......@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
package org.apache.rocketmq.acl.common;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclUtils;
import org.junit.Assert;
import org.junit.Test;
......@@ -125,7 +125,17 @@ public class AclUtilsTest {
Assert.assertFalse(isMinus);
}
@SuppressWarnings("unchecked")
@Test
public void getYamlDataObjectTest() {
Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/transport.yml", Map.class);
Assert.assertFalse(map.isEmpty());
}
@Test(expected = Exception.class)
public void getYamlDataObjectExceptionTest() {
AclUtils.getYamlDataObject("transport.yml", Map.class);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONArray;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.junit.Assert;
import org.junit.Test;
public class PermissionTest {
@Test
public void fromStringGetPermissionTest() {
byte perm = Permission.fromStringGetPermission("PUB");
Assert.assertEquals(perm, Permission.PUB);
perm = Permission.fromStringGetPermission("SUB");
Assert.assertEquals(perm, Permission.SUB);
perm = Permission.fromStringGetPermission("ANY");
Assert.assertEquals(perm, Permission.ANY);
perm = Permission.fromStringGetPermission("PUB|SUB");
Assert.assertEquals(perm, Permission.ANY);
perm = Permission.fromStringGetPermission("SUB|PUB");
Assert.assertEquals(perm, Permission.ANY);
perm = Permission.fromStringGetPermission("DENY");
Assert.assertEquals(perm, Permission.DENY);
perm = Permission.fromStringGetPermission("1");
Assert.assertEquals(perm, Permission.DENY);
perm = Permission.fromStringGetPermission(null);
Assert.assertEquals(perm, Permission.DENY);
}
@Test
public void checkPermissionTest() {
boolean boo = Permission.checkPermission(Permission.DENY, Permission.DENY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.PUB, Permission.PUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.SUB, Permission.SUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.ANY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.SUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.ANY, Permission.PUB);
Assert.assertTrue(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.ANY);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.PUB);
Assert.assertFalse(boo);
boo = Permission.checkPermission(Permission.DENY, Permission.SUB);
Assert.assertFalse(boo);
}
@Test(expected = AclException.class)
public void setTopicPermTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Permission.setTopicPerm(plainAccessResource, false, null);
Assert.assertNull(resourcePermMap);
JSONArray groups = new JSONArray();
Permission.setTopicPerm(plainAccessResource, false, groups);
Assert.assertNull(resourcePermMap);
groups.add("groupA=DENY");
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
Permission.setTopicPerm(plainAccessResource, false, groups);
resourcePermMap = plainAccessResource.getResourcePermMap();
byte perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA"));
Assert.assertEquals(perm, Permission.DENY);
perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB"));
Assert.assertEquals(perm, Permission.ANY);
perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC"));
Assert.assertEquals(perm, Permission.PUB);
JSONArray topics = new JSONArray();
topics.add("topicA=DENY");
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
Permission.setTopicPerm(plainAccessResource, true, topics);
perm = resourcePermMap.get("topicA");
Assert.assertEquals(perm, Permission.DENY);
perm = resourcePermMap.get("topicB");
Assert.assertEquals(perm, Permission.ANY);
perm = resourcePermMap.get("topicC");
Assert.assertEquals(perm, Permission.PUB);
JSONArray erron = new JSONArray();
erron.add("");
Permission.setTopicPerm(plainAccessResource, false, erron);
}
@Test
public void checkAdminCodeTest() {
Set<Integer> code = new HashSet<>();
code.add(17);
code.add(25);
code.add(215);
code.add(200);
code.add(207);
for (int i = 0; i < 400; i++) {
boolean boo = Permission.checkAdminCode(i);
if (boo) {
Assert.assertTrue(code.contains(i));
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.nio.ByteBuffer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PlainAccessValidatorTest {
PlainAccessValidator plainAccessValidator;
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
plainAccessValidator = new PlainAccessValidator();
}
@Test
public void contentTest() {
SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("12345678");
sessionCredentials.setSecurityToken("87654321");
AclClientRPCHook aclClient = new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1");
String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
Assert.assertEquals(accessResource.getSignature(), signature);
}
@Test
public void validateTest() {
SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("12345678");
sessionCredentials.setSecurityToken("87654321");
AclClientRPCHook aclClient = new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
plainAccessValidator.validate(accessResource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.acl.plain.PlainPermissionLoader.AccessContralAnalysis;
import org.apache.rocketmq.acl.plain.PlainPermissionLoader.BrokerAccessControlTransport;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class PlainAclPlugEngineTest {
PlainPermissionLoader plainPermissionLoader;
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
PlainAccessResource plainAccessResource;
PlainAccessResource plainAccessResourceTwo;
AuthenticationInfo authenticationInfo;
BrokerAccessControl brokerAccessControl;
Set<Integer> adminCode = new HashSet<>();
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
// UPDATE_AND_CREATE_TOPIC
adminCode.add(17);
// UPDATE_BROKER_CONFIG
adminCode.add(25);
// DELETE_TOPIC_IN_BROKER
adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200);
// DELETE_SUBSCRIPTIONGROUP
adminCode.add(207);
accessContralAnalysis.analysisClass(RequestCode.class);
brokerAccessControl = new BrokerAccessControl();
// 321
brokerAccessControl.setQueryConsumeQueue(false);
Set<String> permitSendTopic = new HashSet<>();
permitSendTopic.add("permitSendTopic");
brokerAccessControl.setPermitSendTopic(permitSendTopic);
Set<String> noPermitSendTopic = new HashSet<>();
noPermitSendTopic.add("noPermitSendTopic");
brokerAccessControl.setNoPermitSendTopic(noPermitSendTopic);
Set<String> permitPullTopic = new HashSet<>();
permitPullTopic.add("permitPullTopic");
brokerAccessControl.setPermitPullTopic(permitPullTopic);
Set<String> noPermitPullTopic = new HashSet<>();
noPermitPullTopic.add("noPermitPullTopic");
brokerAccessControl.setNoPermitPullTopic(noPermitPullTopic);
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
accessContralAnalysis.analysisClass(RequestCode.class);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(brokerAccessControl);
authenticationInfo = new AuthenticationInfo(map, brokerAccessControl, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
System.setProperty("rocketmq.home.dir", "src/test/resources");
plainPermissionLoader = new PlainPermissionLoader();
plainAccessResource = new BrokerAccessControl();
plainAccessResource.setAccessKey("rokcetmq");
plainAccessResource.setSignature("aliyun11");
plainAccessResource.setRemoteAddr("127.0.0.1");
plainAccessResource.setRecognition("127.0.0.1:1");
plainAccessResourceTwo = new BrokerAccessControl();
plainAccessResourceTwo.setAccessKey("rokcet1");
plainAccessResourceTwo.setSignature("aliyun1");
plainAccessResourceTwo.setRemoteAddr("127.0.0.1");
plainAccessResourceTwo.setRecognition("127.0.0.1:2");
}
@Test(expected = AclPlugRuntimeException.class)
public void accountNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.setAccessControl(plainAccessResource);
}
@Test(expected = AclPlugRuntimeException.class)
public void accountThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.setAccessControl(plainAccessResource);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordtNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.setAccessControl(plainAccessResource);
}
@Test(expected = AclPlugRuntimeException.class)
public void passWordThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.setAccessControl(plainAccessResource);
}
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
new PlainPermissionLoader().initialize();
}
@Test
public void authenticationInfoOfSetAccessControl() {
plainPermissionLoader.setAccessControl(plainAccessResource);
AuthenticationInfo authenticationInfo = plainPermissionLoader.getAccessControl(plainAccessResource);
PlainAccessResource getPlainAccessResource = authenticationInfo.getPlainAccessResource();
Assert.assertEquals(plainAccessResource, getPlainAccessResource);
PlainAccessResource testPlainAccessResource = new PlainAccessResource();
testPlainAccessResource.setAccessKey("rokcetmq");
testPlainAccessResource.setSignature("aliyun11");
testPlainAccessResource.setRemoteAddr("127.0.0.1");
testPlainAccessResource.setRecognition("127.0.0.1:1");
testPlainAccessResource.setAccessKey("rokcetmq1");
authenticationInfo = plainPermissionLoader.getAccessControl(testPlainAccessResource);
Assert.assertNull(authenticationInfo);
testPlainAccessResource.setAccessKey("rokcetmq");
testPlainAccessResource.setSignature("1234567");
authenticationInfo = plainPermissionLoader.getAccessControl(testPlainAccessResource);
Assert.assertNull(authenticationInfo);
testPlainAccessResource.setRemoteAddr("127.0.0.2");
authenticationInfo = plainPermissionLoader.getAccessControl(testPlainAccessResource);
Assert.assertNull(authenticationInfo);
}
@Test
public void setAccessControlList() {
List<PlainAccessResource> plainAccessResourceList = new ArrayList<>();
plainAccessResourceList.add(plainAccessResource);
plainAccessResourceList.add(plainAccessResourceTwo);
plainPermissionLoader.setAccessControlList(plainAccessResourceList);
AuthenticationInfo newAccessControl = plainPermissionLoader.getAccessControl(plainAccessResource);
Assert.assertEquals(plainAccessResource, newAccessControl.getPlainAccessResource());
newAccessControl = plainPermissionLoader.getAccessControl(plainAccessResourceTwo);
Assert.assertEquals(plainAccessResourceTwo, newAccessControl.getPlainAccessResource());
}
@Test
public void setNetaddressAccessControl() {
PlainAccessResource plainAccessResource = new BrokerAccessControl();
plainAccessResource.setAccessKey("RocketMQ");
plainAccessResource.setSignature("RocketMQ");
plainAccessResource.setRemoteAddr("127.0.0.1");
plainPermissionLoader.setAccessControl(plainAccessResource);
plainPermissionLoader.setNetaddressAccessControl(plainAccessResource);
AuthenticationInfo authenticationInfo = plainPermissionLoader.getAccessControl(plainAccessResource);
PlainAccessResource getPlainAccessResource = authenticationInfo.getPlainAccessResource();
Assert.assertEquals(plainAccessResource, getPlainAccessResource);
plainAccessResource.setRemoteAddr("127.0.0.2");
authenticationInfo = plainPermissionLoader.getAccessControl(plainAccessResource);
Assert.assertNull(authenticationInfo);
}
public void eachCheckLoginAndAuthentication() {
}
@Test(expected = AclPlugRuntimeException.class)
public void BrokerAccessControlTransportTestNull() {
BrokerAccessControlTransport accessControlTransport = new BrokerAccessControlTransport();
plainPermissionLoader.setBrokerAccessControlTransport(accessControlTransport);
}
@Test
public void BrokerAccessControlTransportTest() {
BrokerAccessControlTransport accessControlTransport = new BrokerAccessControlTransport();
List<BrokerAccessControl> list = new ArrayList<>();
list.add((BrokerAccessControl) this.plainAccessResourceTwo);
accessControlTransport.setOnlyNetAddress((BrokerAccessControl) this.plainAccessResource);
accessControlTransport.setList(list);
plainPermissionLoader.setBrokerAccessControlTransport(accessControlTransport);
PlainAccessResource plainAccessResource = new BrokerAccessControl();
plainAccessResource.setAccessKey("RocketMQ");
plainAccessResource.setSignature("RocketMQ");
plainAccessResource.setRemoteAddr("127.0.0.1");
plainPermissionLoader.setAccessControl(plainAccessResource);
AuthenticationInfo authenticationInfo = plainPermissionLoader.getAccessControl(plainAccessResource);
Assert.assertNotNull(authenticationInfo.getPlainAccessResource());
authenticationInfo = plainPermissionLoader.getAccessControl(plainAccessResourceTwo);
Assert.assertEquals(plainAccessResourceTwo, authenticationInfo.getPlainAccessResource());
}
@Test
public void authenticationTest() {
AuthenticationResult authenticationResult = new AuthenticationResult();
plainAccessResource.setRequestCode(317);
boolean isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
plainAccessResource.setRequestCode(321);
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
plainAccessResource.setRequestCode(10);
plainAccessResource.setTopic("permitSendTopic");
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
plainAccessResource.setRequestCode(310);
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
plainAccessResource.setRequestCode(320);
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
plainAccessResource.setTopic("noPermitSendTopic");
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
plainAccessResource.setTopic("nopermitSendTopic");
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
plainAccessResource.setRequestCode(11);
plainAccessResource.setTopic("permitPullTopic");
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
plainAccessResource.setTopic("noPermitPullTopic");
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
plainAccessResource.setTopic("nopermitPullTopic");
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
}
@Test
public void isEmptyTest() {
AuthenticationResult authenticationResult = new AuthenticationResult();
plainAccessResource.setRequestCode(10);
plainAccessResource.setTopic("absentTopic");
boolean isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
Set<String> permitSendTopic = new HashSet<>();
brokerAccessControl.setPermitSendTopic(permitSendTopic);
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
plainAccessResource.setRequestCode(11);
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertFalse(isReturn);
brokerAccessControl.setPermitPullTopic(permitSendTopic);
isReturn = plainPermissionLoader.authentication(authenticationInfo, plainAccessResource, authenticationResult);
Assert.assertTrue(isReturn);
}
@Test
public void adminBrokerAccessControlTest() {
BrokerAccessControl admin = new BrokerAccessControl();
admin.setAccessKey("adminTest");
admin.setSignature("adminTest");
admin.setRemoteAddr("127.0.0.1");
plainPermissionLoader.setAccessControl(admin);
Assert.assertFalse(admin.isUpdateAndCreateTopic());
admin.setAdmin(true);
plainPermissionLoader.setAccessControl(admin);
Assert.assertTrue(admin.isUpdateAndCreateTopic());
}
@Test
public void adminEachCheckAuthentication() {
BrokerAccessControl accessControl = new BrokerAccessControl();
accessControl.setAccessKey("RocketMQ1");
accessControl.setSignature("1234567");
accessControl.setRemoteAddr("127.0.0.1");
plainPermissionLoader.setAccessControl(accessControl);
for (Integer code : adminCode) {
accessControl.setRequestCode(code);
AuthenticationResult authenticationResult = plainPermissionLoader.eachCheckAuthentication(accessControl);
Assert.assertFalse(authenticationResult.isSucceed());
}
plainPermissionLoader.cleanAuthenticationInfo();
accessControl.setAdmin(true);
plainPermissionLoader.setAccessControl(accessControl);
for (Integer code : adminCode) {
accessControl.setRequestCode(code);
AuthenticationResult authenticationResult = plainPermissionLoader.eachCheckAuthentication(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
}
}
@Test
public void cleanAuthenticationInfoTest() {
plainPermissionLoader.setAccessControl(plainAccessResource);
plainAccessResource.setRequestCode(202);
AuthenticationResult authenticationResult = plainPermissionLoader.eachCheckAuthentication(plainAccessResource);
Assert.assertTrue(authenticationResult.isSucceed());
plainPermissionLoader.cleanAuthenticationInfo();
authenticationResult = plainPermissionLoader.eachCheckAuthentication(plainAccessResource);
Assert.assertFalse(authenticationResult.isSucceed());
}
@Test
public void isWatchStartTest() {
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart());
System.setProperty("java.version", "1.6.11");
plainPermissionLoader = new PlainPermissionLoader();
Assert.assertFalse(plainPermissionLoader.isWatchStart());
}
@Test
public void watchTest() throws IOException {
System.setProperty("rocketmq.home.dir", "src/test/resources/watch");
File file = new File("src/test/resources/watch/conf");
file.mkdirs();
File transport = new File("src/test/resources/watch/conf/transport.yml");
transport.createNewFile();
FileWriter writer = new FileWriter(transport);
writer.write("list:\r\n");
writer.write("- account: rokcetmq\r\n");
writer.write(" password: aliyun11\r\n");
writer.write(" netaddress: 127.0.0.1\r\n");
writer.flush();
writer.close();
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
plainAccessResource.setRequestCode(203);
AuthenticationResult authenticationResult = plainPermissionLoader.eachCheckAuthentication(plainAccessResource);
Assert.assertTrue(authenticationResult.isSucceed());
writer = new FileWriter(new File("src/test/resources/watch/conf/transport.yml"), true);
writer.write("- account: rokcet1\r\n");
writer.write(" password: aliyun1\r\n");
writer.write(" netaddress: 127.0.0.1\r\n");
writer.flush();
writer.close();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
plainAccessResourceTwo.setRequestCode(203);
authenticationResult = plainPermissionLoader.eachCheckAuthentication(plainAccessResourceTwo);
Assert.assertTrue(authenticationResult.isSucceed());
transport.delete();
file.delete();
file = new File("src/test/resources/watch");
file.delete();
}
@Test
public void analysisTest() {
BrokerAccessControl accessControl = new BrokerAccessControl();
accessControl.setSendMessage(false);
Map<Integer, Boolean> map = accessContralAnalysis.analysis(accessControl);
Iterator<Entry<Integer, Boolean>> it = map.entrySet().iterator();
long num = 0;
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
if (!e.getValue()) {
if (adminCode.contains(e.getKey())) {
continue;
}
Assert.assertEquals(e.getKey(), Integer.valueOf(10));
num++;
}
}
Assert.assertEquals(num, 1);
}
@Test(expected = AclPlugRuntimeException.class)
public void analysisExceptionTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
accessContralAnalysis.analysis(plainAccessResource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plain;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({AclUtils.class})
public class PlainPermissionLoaderTest {
PlainPermissionLoader plainPermissionLoader;
PlainAccessResource PUBPlainAccessResource;
PlainAccessResource SUBPlainAccessResource;
PlainAccessResource ANYPlainAccessResource;
PlainAccessResource DENYPlainAccessResource;
PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
Set<Integer> adminCode = new HashSet<>();
private String fileName = System.getProperty("romcketmq.acl.plain.fileName", "/conf/transport.yml");
private Map<String/** account **/
, List<PlainAccessResource>> plainAccessResourceMap;
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy;
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
// UPDATE_AND_CREATE_TOPIC
adminCode.add(17);
// UPDATE_BROKER_CONFIG
adminCode.add(25);
// DELETE_TOPIC_IN_BROKER
adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200);
// DELETE_SUBSCRIPTIONGROUP
adminCode.add(207);
PUBPlainAccessResource = clonePlainAccessResource(Permission.PUB);
SUBPlainAccessResource = clonePlainAccessResource(Permission.SUB);
ANYPlainAccessResource = clonePlainAccessResource(Permission.ANY);
DENYPlainAccessResource = clonePlainAccessResource(Permission.DENY);
System.setProperty("java.version", "1.6.11");
System.setProperty("rocketmq.home.dir", "src/test/resources");
plainPermissionLoader = new PlainPermissionLoader();
}
public PlainAccessResource clonePlainAccessResource(byte perm) {
PlainAccessResource painAccessResource = new PlainAccessResource();
painAccessResource.setAccessKey("RocketMQ");
painAccessResource.setSecretKey("12345678");
painAccessResource.setWhiteRemoteAddress("127.0." + perm + ".*");
painAccessResource.setDefaultGroupPerm(perm);
painAccessResource.setDefaultTopicPerm(perm);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupA"), Permission.PUB);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupB"), Permission.SUB);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupC"), Permission.ANY);
painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupD"), Permission.DENY);
painAccessResource.addResourceAndPerm("topicA", Permission.PUB);
painAccessResource.addResourceAndPerm("topicB", Permission.SUB);
painAccessResource.addResourceAndPerm("topicC", Permission.ANY);
painAccessResource.addResourceAndPerm("topicD", Permission.DENY);
return painAccessResource;
}
@SuppressWarnings("unchecked")
private void getField(PlainPermissionLoader plainPermissionLoader) {
try {
this.globalWhiteRemoteAddressStrategy = (List<RemoteAddressStrategy>) FieldUtils.readDeclaredField(plainPermissionLoader, "globalWhiteRemoteAddressStrategy", true);
this.plainAccessResourceMap = (Map<String/** account **/, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
@Test(expected = AclException.class)
public void initializeTest() {
System.setProperty("romcketmq.acl.plain.fileName", "/conf/transport-null.yml");
new PlainPermissionLoader();
}
@Test
public void initializeIngetYamlDataObject() {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
PowerMockito.mockStatic(AclUtils.class);
JSONObject json = new JSONObject();
json.put("", "");
PowerMockito.when(AclUtils.getYamlDataObject(fileHome + "/conf/transport.yml", JSONObject.class)).thenReturn(json);
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
getField(plainPermissionLoader);
Assert.assertTrue(globalWhiteRemoteAddressStrategy.isEmpty());
Assert.assertTrue(plainAccessResourceMap.isEmpty());
}
@Test
public void getPlainAccessResourceTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
JSONObject account = new JSONObject();
account.put("accessKey", "RocketMQ");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
account.put("secretKey", "12345678");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
account.put("whiteRemoteAddress", "127.0.0.1");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
account.put("admin", true);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Assert.assertEquals(plainAccessResource.isAdmin(), true);
account.put("defaultGroupPerm", "ANY");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Assert.assertEquals(plainAccessResource.getDefaultGroupPerm(), Permission.ANY);
account.put("defaultTopicPerm", "ANY");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Assert.assertEquals(plainAccessResource.getDefaultTopicPerm(), Permission.ANY);
JSONArray groups = new JSONArray();
groups.add("groupA=DENY");
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
account.put("groups", groups);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);
Assert.assertEquals(resourcePermMap.get("groupA").byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get("groupB").byteValue(), Permission.ANY);
Assert.assertEquals(resourcePermMap.get("groupC").byteValue(), Permission.PUB);
JSONArray topics = new JSONArray();
topics.add("topicA=DENY");
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
account.put("topics", topics);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(account);
resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);
Assert.assertEquals(resourcePermMap.get("topicA").byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.ANY);
Assert.assertEquals(resourcePermMap.get("topicC").byteValue(), Permission.PUB);
}
@Test(expected = AclException.class)
public void checkPermAdmin() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRequestCode(17);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
}
@Test
public void checkPerm() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("pub", Permission.PUB);
plainPermissionLoader.checkPerm(PUBPlainAccessResource, plainAccessResource);
plainAccessResource.addResourceAndPerm("sub", Permission.SUB);
plainPermissionLoader.checkPerm(ANYPlainAccessResource, plainAccessResource);
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("sub", Permission.SUB);
plainPermissionLoader.checkPerm(SUBPlainAccessResource, plainAccessResource);
plainAccessResource.addResourceAndPerm("pub", Permission.PUB);
plainPermissionLoader.checkPerm(ANYPlainAccessResource, plainAccessResource);
}
@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.setPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void accountThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.setPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void passWordtNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.setPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void passWordThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.setPlainAccessResource(plainAccessResource);
}
@Test(expected = AclException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
new PlainPermissionLoader().initialize();
}
@Test
public void cleanAuthenticationInfoTest() {
plainPermissionLoader.setPlainAccessResource(plainAccessResource);
plainAccessResource.setRequestCode(202);
plainPermissionLoader.eachCheckPlainAccessResource(plainAccessResource);
plainPermissionLoader.cleanAuthenticationInfo();
plainPermissionLoader.eachCheckPlainAccessResource(plainAccessResource);
}
@Test
public void isWatchStartTest() {
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart());
System.setProperty("java.version", "1.6.11");
plainPermissionLoader = new PlainPermissionLoader();
Assert.assertFalse(plainPermissionLoader.isWatchStart());
}
@Test
public void watchTest() throws IOException {
System.setProperty("rocketmq.home.dir", "src/test/resources/watch");
File file = new File("src/test/resources/watch/conf");
file.mkdirs();
File transport = new File("src/test/resources/watch/conf/transport.yml");
transport.createNewFile();
FileWriter writer = new FileWriter(transport);
writer.write("list:\r\n");
writer.write("- account: rokcetmq\r\n");
writer.write(" password: aliyun11\r\n");
writer.write(" netaddress: 127.0.0.1\r\n");
writer.flush();
writer.close();
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
plainAccessResource.setRequestCode(203);
plainPermissionLoader.eachCheckPlainAccessResource(plainAccessResource);
writer = new FileWriter(new File("src/test/resources/watch/conf/transport.yml"), true);
writer.write("- account: rokcet1\r\n");
writer.write(" password: aliyun1\r\n");
writer.write(" netaddress: 127.0.0.1\r\n");
writer.flush();
writer.close();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
plainAccessResourceTwo.setRequestCode(203);
plainPermissionLoader.eachCheckPlainAccessResource(plainAccessResourceTwo);
transport.delete();
file.delete();
file = new File("src/test/resources/watch");
file.delete();
}
}
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.acl.plain;
import org.apache.rocketmq.acl.common.AclException;
import org.junit.Assert;
import org.junit.Test;
......@@ -29,41 +30,41 @@ public class RemoteAddressStrategyTest {
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
plainAccessResource.setRemoteAddr("*");
plainAccessResource.setWhiteRemoteAddress("*");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
plainAccessResource.setRemoteAddr("127.0.0.1");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.OneRemoteAddressStrategy.class);
plainAccessResource.setRemoteAddr("127.0.0.1,127.0.0.2,127.0.0.3");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setRemoteAddr("127.0.0.{1,2,3}");
plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setRemoteAddr("127.0.0.1-200");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setRemoteAddr("127.0.0.*");
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setRemoteAddr("127.0.1-20.*");
plainAccessResource.setWhiteRemoteAddress("127.0.1-20.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
}
@Test(expected = AclPlugRuntimeException.class)
@Test(expected = AclException.class)
public void verifyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr("127.0.0.1");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
plainAccessResource.setRemoteAddr("256.0.0.1");
plainAccessResource.setWhiteRemoteAddress("256.0.0.1");
remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
}
......@@ -75,17 +76,17 @@ public class RemoteAddressStrategyTest {
public void oneNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr("127.0.0.1");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
plainAccessResource.setRemoteAddr("");
plainAccessResource.setWhiteRemoteAddress("");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setRemoteAddr("127.0.0.2");
plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setRemoteAddr("127.0.0.1");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
}
......@@ -93,42 +94,42 @@ public class RemoteAddressStrategyTest {
@Test
public void multipleNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr("127.0.0.1,127.0.0.2,127.0.0.3");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
plainAccessResource.setRemoteAddr("127.0.0.{1,2,3}");
plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
}
@Test(expected = AclPlugRuntimeException.class)
@Test(expected = AclException.class)
public void multipleNetaddressStrategyExceptionTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr("127.0.0.1,2,3}");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,2,3}");
remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
}
private void multipleNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr("127.0.0.1");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setRemoteAddr("127.0.0.2");
plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setRemoteAddr("127.0.0.3");
plainAccessResource.setWhiteRemoteAddress("127.0.0.3");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setRemoteAddr("127.0.0.4");
plainAccessResource.setWhiteRemoteAddress("127.0.0.4");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setRemoteAddr("127.0.0.0");
plainAccessResource.setWhiteRemoteAddress("127.0.0.0");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
......@@ -138,23 +139,24 @@ public class RemoteAddressStrategyTest {
public void rangeNetaddressStrategyTest() {
String head = "127.0.0.";
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr("127.0.0.1-200");
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 1, 200, true);
plainAccessResource.setRemoteAddr("127.0.0.*");
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 0, 255, true);
plainAccessResource.setRemoteAddr("127.0.1-200.*");
plainAccessResource.setWhiteRemoteAddress("127.0.1-200.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
}
private void rangeNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start, int end,
private void rangeNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
int end,
boolean isFalse) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
for (int i = -10; i < 300; i++) {
plainAccessResource.setRemoteAddr(head + i);
plainAccessResource.setWhiteRemoteAddress(head + i);
boolean match = remoteAddressStrategy.match(plainAccessResource);
if (isFalse && i >= start && i <= end) {
Assert.assertTrue(match);
......@@ -176,24 +178,24 @@ public class RemoteAddressStrategyTest {
}
}
@Test(expected = AclPlugRuntimeException.class)
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
}
@Test(expected = AclPlugRuntimeException.class)
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionScopeTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.-1-200");
}
@Test(expected = AclPlugRuntimeException.class)
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionScopeTwoTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.0-256");
}
private void rangeNetaddressStrategyExceptionTest(String netaddress) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRemoteAddr(netaddress);
plainAccessResource.setWhiteRemoteAddress(netaddress);
remoteAddressStrategyFactory.getNetaddressStrategy(plainAccessResource);
}
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
......@@ -13,36 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
onlyNetAddress:
remoteAddr: 10.10.103.*
noPermitPullTopic:
- broker-a
list:
- accessKey: RocketMQ
signature: 1234567
remoteAddr: 192.0.0.*
admin: true
permitSendTopic:
- test1
- test2
- accessKey: RocketMQ
signature: 1234567
remoteAddr: 192.0.2.1
permitSendTopic:
- test3
- test4
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: ak1
secretKey: sk1
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
......@@ -57,8 +36,8 @@ accounts:
- groupB=SUB
- groupC=SUB
- accessKey: ak2
secretKey: sk2
- accessKey: aliyun.com
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
......
......@@ -19,13 +19,13 @@ onlyNetAddress:
- broker-a
list:
- account: RocketMQ
- accessKey: RocketMQ
signature: 1234567
remoteAddr: 192.168.0.*
permitSendTopic:
- TopicTest
- test2
- account: RocketMQ
- accessKey: RocketMQ
signature: 1234567
remoteAddr: 192.168.2.1
permitSendTopic:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册