提交 9e0021db 编写于 作者: H hujie

acl-plug rudimentary model

上级 7cae5839
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl-plug</artifactId>
<name>rocketmq-acl-plug ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
package org.apache.rocketmq.acl.plug;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rocketmq.acl.plug.annotation.RequestCode;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public class AccessContralAnalysis {
private Map<Class<?>, Map<Integer, Field>> classTocodeAndMentod = new HashMap<>();
public Map<Integer , Boolean> analysis(AccessControl accessControl) {
Class<? extends AccessControl> clazz = accessControl.getClass();
Map<Integer, Field> codeAndField = classTocodeAndMentod.get(clazz);
if (codeAndField == null) {
codeAndField = new HashMap<>();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
RequestCode requestCode = field.getAnnotation(RequestCode.class);
if (requestCode != null) {
int code = requestCode.code();
if (codeAndField.containsKey(code)) {
} else {
field.setAccessible(true);
codeAndField.put(code, field);
}
}
}
classTocodeAndMentod.put(clazz, codeAndField);
}
Iterator<Entry<Integer, Field>> it = codeAndField.entrySet().iterator();
Map<Integer, Boolean> authority = new HashMap<>();
try {
while (it.hasNext()) {
Entry<Integer, Field> e = it.next();
authority.put(e.getKey(), (Boolean)e.getValue().get(accessControl));
}
} catch (IllegalArgumentException | IllegalAccessException e1) {
e1.printStackTrace();
}
return authority;
}
}
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity;
public class AclPlugController {
private ControllerParametersEntity controllerParametersEntity;
private AclPlugEngine aclPlugEngine;
private AclRemotingServer aclRemotingServer;
public AclPlugController(ControllerParametersEntity controllerParametersEntity){
this.controllerParametersEntity = controllerParametersEntity;
aclPlugEngine = new PlainAclPlugEngine();
aclRemotingServer = new DefaultAclRemotingServerImpl(aclPlugEngine);
}
public AclRemotingServer getAclRemotingServer() {
return this.aclRemotingServer;
}
public boolean isStartSucceed() {
return true;
}
}
package org.apache.rocketmq.acl.plug;
public class AclPlugServer {
}
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
public interface AclRemotingServer {
public AuthenticationInfo login();
public AuthenticationInfo eachCheck(LoginOrRequestAccessControl accessControl);
}
package org.apache.rocketmq.acl.plug;
import org.apache.commons.lang3.StringUtils;
public class AclUtils {
public static String[] getAddreeStrArray(String netaddress ,String four ) {
String[] fourStrArray = StringUtils.split(four.substring(1, four.length()-1) , ",");
String address = netaddress.substring(0, netaddress.indexOf("{") );
String[] addreeStrArray = new String[ fourStrArray.length ];
for(int i = 0 ; i < fourStrArray.length ; i++) {
addreeStrArray[i] = address+fourStrArray[i];
}
return addreeStrArray;
}
public static boolean isScope(String num, int index) {
String[] strArray = StringUtils.split(num , ".");
if(strArray.length != 4) {
return false;
}
return isScope(strArray, index);
}
public static boolean isScope(String[] num, int index) {
if (num.length <= index) {
}
for (int i = 0; i < index; i++) {
if( !isScope(num[i])) {
return false;
}
}
return true;
}
public static boolean isScope(String num) {
return isScope(Integer.valueOf(num.trim()));
}
public static boolean isScope(int num) {
return num >= 0 && num <= 255;
}
public static boolean isAsterisk(String asterisk) {
return asterisk.indexOf('*') > -1;
}
public static boolean isColon(String colon) {
return colon.indexOf(',') > -1;
}
public static boolean isMinus(String minus) {
return minus.indexOf('-') > -1;
}
}
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
public class Authentication {
public boolean authentication(AuthenticationInfo authenticationInfo, LoginOrRequestAccessControl loginOrRequestAccessControl,AuthenticationResult authenticationResult) {
int code = loginOrRequestAccessControl.getCode();
if (authenticationInfo.getAuthority().get(code)) {
AccessControl accessControl = authenticationInfo.getAccessControl();
if( !(accessControl instanceof BorkerAccessControl)) {
return true;
}
BorkerAccessControl borker = (BorkerAccessControl) authenticationInfo.getAccessControl();
String topicName = loginOrRequestAccessControl.getTopic();
if (code == 10 || code == 310 || code == 320) {
if (borker.getPermitSendTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitSendTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName));
return false;
}
return true;
} else if (code == 11) {
if (borker.getPermitPullTopic().contains(topicName)) {
return true;
}
if (borker.getNoPermitPullTopic().contains(topicName)) {
authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName));
return false;
}
return true;
}
return true;
}
return false;
}
}
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
public class DefaultAclRemotingServerImpl implements AclRemotingServer {
private AclPlugEngine aclPlugEngine;
public DefaultAclRemotingServerImpl(AclPlugEngine aclPlugEngine ) {
this.aclPlugEngine = aclPlugEngine;
}
@Override
public AuthenticationInfo login() {
return null;
}
@Override
public AuthenticationInfo eachCheck(LoginOrRequestAccessControl accessControl) {
aclPlugEngine.eachCheckLoginAndAuthentication(accessControl);
return null;
}
}
package org.apache.rocketmq.acl.plug;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
public class EmptyImplementationAclRemotingServer implements AclRemotingServer {
@Override
public AuthenticationInfo login() {
return null;
}
@Override
public AuthenticationInfo eachCheck() {
// TODO Auto-generated method stub
return null;
}
}
package org.apache.rocketmq.acl.plug.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface RequestCode {
int code();
}
package org.apache.rocketmq.acl.plug.engine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
public interface AclPlugEngine {
public AuthenticationInfo getAccessControl(AccessControl accessControl) ;
public LoginInfo getLoginInfo(AccessControl accessControl) ;
public AuthenticationResult eachCheckLoginAndAuthentication(LoginOrRequestAccessControl accessControl);
}
package org.apache.rocketmq.acl.plug.engine;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.Authentication;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPlugEngine {
private Map<String/**account **/ , Map<String/**netaddress**/ , AuthenticationInfo>> accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
private Authentication authentication = new Authentication();
public void setAccessControl(AccessControl accessControl) {
try {
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Map<String , AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
if(accessControlAddressMap == null ) {
accessControlAddressMap = new HashMap<>();
accessControlMap.put(accessControl.getAccount(), accessControlAddressMap);
}
accessControlAddressMap.put(accessControl.getNetaddress(), new AuthenticationInfo(accessContralAnalysis.analysis(accessControl),accessControl ,netaddressStrategy));
}catch(Exception e) {
// TODO Exception
}
}
public void setAccessControlList(List<AccessControl> AccessControlList) {
for(AccessControl accessControl : AccessControlList) {
setAccessControl(accessControl);
}
}
public void setNetaddressAccessControl(AccessControl accessControl) {
authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl) , accessControl, netaddressStrategyFactory.getNetaddressStrategy(accessControl));
}
public AuthenticationInfo getAccessControl(AccessControl accessControl) {
AuthenticationInfo existing = null;
if( accessControl.getAccount() == null && authenticationInfo != null) {
existing = authenticationInfo.getNetaddressStrategy().match(accessControl)?authenticationInfo:null;
}else {
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
if(accessControlAddressMap != null ) {
existing = accessControlAddressMap.get(accessControl.getNetaddress());
if(existing.getAccessControl().getPassword().equals(accessControl.getPassword())) {
if( existing.getNetaddressStrategy().match(accessControl)) {
return existing;
}
}
existing = null;
}
}
return existing;
}
@Override
public AuthenticationResult eachCheckLoginAndAuthentication(LoginOrRequestAccessControl accessControl) {
AuthenticationResult authenticationResult = new AuthenticationResult();
AuthenticationInfo authenticationInfo = getAuthenticationInfo(accessControl , authenticationResult);
if(authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl,authenticationResult);
authenticationResult.setSucceed( boo );
}
return authenticationResult;
}
protected abstract AuthenticationInfo getAuthenticationInfo(LoginOrRequestAccessControl accessControl , AuthenticationResult authenticationResult);
}
package org.apache.rocketmq.acl.plug.engine;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.LoginInfo;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagementAclPlugEngine {
private Map<String, LoginInfo> loginInfoMap = new ConcurrentHashMap<>();
@Override
public AuthenticationInfo getAccessControl(AccessControl accessControl) {
AuthenticationInfo authenticationInfo = super.getAccessControl(accessControl);
LoginInfo loginInfo = new LoginInfo();
loginInfo.setAuthenticationInfo(authenticationInfo);
loginInfoMap.put(accessControl.getRecognition(), loginInfo);
return authenticationInfo;
}
public LoginInfo getLoginInfo(AccessControl accessControl) {
LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
if (loginInfo == null) {
getAccessControl(accessControl);
loginInfo = loginInfoMap.get(accessControl.getRecognition());
}
if (loginInfo != null) {
loginInfo.setOperationTime(System.currentTimeMillis());
}
return loginInfo;
}
protected AuthenticationInfo getAuthenticationInfo(LoginOrRequestAccessControl accessControl , AuthenticationResult authenticationResult) {
LoginInfo anthenticationInfo = getLoginInfo(accessControl);
if(anthenticationInfo != null) {
return anthenticationInfo.getAuthenticationInfo();
}else {
authenticationResult.setResultString("Login information does not exist");
}
return null;
}
}
package org.apache.rocketmq.acl.plug.engine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.yaml.snakeyaml.Yaml;
public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
public PlainAclPlugEngine() {
init();
}
void init() {
Yaml ymal = new Yaml();
BorkerAccessControlTransport transport = ymal.loadAs(PlainAclPlugEngine.class.getClassLoader().getResourceAsStream( "transport.yml"), BorkerAccessControlTransport.class);
super.setNetaddressAccessControl(transport.getOnlyNetAddress());
for(AccessControl accessControl : transport.getList()) {
super.setAccessControl(accessControl);
}
}
}
package org.apache.rocketmq.acl.plug.entity;
public class AccessControl {
private String account;
private String password;
private String netaddress;
private String recognition;
public AccessControl() {
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getNetaddress() {
return netaddress;
}
public void setNetaddress(String netaddress) {
this.netaddress = netaddress;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
@Override
public String toString() {
return "AccessControl [account=" + account + ", password=" + password + ", netaddress=" + netaddress
+ ", recognition=" + recognition + "]";
}
}
package org.apache.rocketmq.acl.plug.entity;
import java.util.Map;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
public class AuthenticationInfo {
private AccessControl accessControl;
private NetaddressStrategy netaddressStrategy;
private Map<Integer, Boolean> authority;
public AuthenticationInfo(Map<Integer, Boolean> authority , AccessControl accessControl, NetaddressStrategy netaddressStrategy) {
super();
this.authority = authority;
this.accessControl = accessControl;
this.netaddressStrategy = netaddressStrategy;
}
public AccessControl getAccessControl() {
return accessControl;
}
public void setAccessControl(AccessControl accessControl) {
this.accessControl = accessControl;
}
public NetaddressStrategy getNetaddressStrategy() {
return netaddressStrategy;
}
public void setNetaddressStrategy(NetaddressStrategy netaddressStrategy) {
this.netaddressStrategy = netaddressStrategy;
}
public Map<Integer, Boolean> getAuthority() {
return authority;
}
public void setAuthority(Map<Integer, Boolean> authority) {
this.authority = authority;
}
@Override
public String toString() {
return "AuthenticationInfo [accessControl=" + accessControl + ", netaddressStrategy=" + netaddressStrategy
+ ", authority=" + authority + "]";
}
}
package org.apache.rocketmq.acl.plug.entity;
public class AuthenticationResult {
private AccessControl accessControl;
private boolean succeed;
private Exception exception;
private String resultString;
public AccessControl getAccessControl() {
return accessControl;
}
public void setAccessControl(AccessControl accessControl) {
this.accessControl = accessControl;
}
public boolean isSucceed() {
return succeed;
}
public void setSucceed(boolean succeed) {
this.succeed = succeed;
}
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
}
package org.apache.rocketmq.acl.plug.entity;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.acl.plug.annotation.RequestCode;
/**
* @author Administrator
*
*/
public class BorkerAccessControl extends AccessControl{
public BorkerAccessControl() {
}
private Set<String> permitSendTopic = new HashSet<>();
private Set<String> noPermitSendTopic = new HashSet<>();
private Set<String> permitPullTopic = new HashSet<>();
private Set<String> noPermitPullTopic = new HashSet<>();
@RequestCode(code = 10)
private boolean sendMessage = true;
@RequestCode(code = 310)
private boolean sendMessageV2 = true;
@RequestCode(code = 320)
private boolean sendBatchMessage = true;
@RequestCode(code = 36)
private boolean consumerSendMsgBack = true;
@RequestCode(code = 11)
private boolean pullMessage = true;
@RequestCode(code = 12)
private boolean queryMessage = true;
@RequestCode(code = 33)
private boolean viewMessageById = true;
@RequestCode(code = 34)
private boolean heartBeat = true;
@RequestCode(code = 35)
private boolean unregisterClient = true;
@RequestCode(code = 46)
private boolean checkClientConfig = true;
@RequestCode(code = 38)
private boolean getConsumerListByGroup = true;
@RequestCode(code = 15)
private boolean updateConsumerOffset = true;
@RequestCode(code = 14)
private boolean queryConsumerOffset = true;
@RequestCode(code = 37)
private boolean endTransaction = true;
@RequestCode(code = 17)
private boolean updateAndCreateTopic = true;
@RequestCode(code = 215)
private boolean deleteTopicInbroker =true;
@RequestCode(code = 21)
private boolean getAllTopicConfig = true;
@RequestCode(code = 25)
private boolean updateBrokerConfig = true;
@RequestCode(code = 26)
private boolean getBrokerConfig = true;
@RequestCode(code = 29)
private boolean searchOffsetByTimestamp = true;
@RequestCode(code = 30)
private boolean getMaxOffset = true;
@RequestCode(code = 31)
private boolean getMixOffset = true;
@RequestCode(code = 32)
private boolean getEarliestMsgStoretime = true;
@RequestCode(code = 28)
private boolean getBrokerRuntimeInfo = true;
@RequestCode(code = 41)
private boolean lockBatchMQ = true;
@RequestCode(code = 42)
private boolean unlockBatchMQ = true;
@RequestCode(code = 200)
private boolean updateAndCreteSubscriptiongroup = true;
@RequestCode(code = 201)
private boolean getAllSubscriptiongroupConfig = true;
@RequestCode(code = 207)
private boolean deleteSubscriptiongroup = true;
@RequestCode(code = 202)
private boolean getTopicStatsInfo = true;
@RequestCode(code = 203)
private boolean getConsumerConnectionList = true;
@RequestCode(code = 204)
private boolean getProducerConnectionList = true;
@RequestCode(code = 208)
private boolean getConsumeStats = true;
@RequestCode(code = 43)
private boolean getAllConsumerOffset = true;
@RequestCode(code = 25)
private boolean getAllDelayOffset = true;
@RequestCode(code = 222)
private boolean invokeBrokerToresetOffset = true;
@RequestCode(code = 300)
private boolean queryTopicConsumByWho = true;
@RequestCode(code = 301)
private boolean registerFilterServer = true;
@RequestCode(code = 303)
private boolean queryConsumeTimeSpan = true;
@RequestCode(code = 305)
private boolean getSystemTopicListFromBroker = true;
@RequestCode(code = 306)
private boolean cleanExpiredConsumequeue = true;
@RequestCode(code = 316)
private boolean cleanUnusedTopic = true;
@RequestCode(code = 307)
private boolean getConsumerRunningInfo = true;
@RequestCode(code = 308)
private boolean queryCorrectionOffset = true;
@RequestCode(code = 309)
private boolean consumeMessageDirectly = true;
@RequestCode(code = 314)
private boolean cloneGroupOffset = true;
@RequestCode(code = 315)
private boolean viewBrokerStatsData = true;
@RequestCode(code = 317)
private boolean getBrokerConsumeStats = true;
@RequestCode(code = 321)
private boolean queryConsumeQueue = true;
public Set<String> getPermitSendTopic() {
return permitSendTopic;
}
public void setPermitSendTopic(Set<String> permitSendTopic) {
this.permitSendTopic = permitSendTopic;
}
public Set<String> getNoPermitSendTopic() {
return noPermitSendTopic;
}
public void setNoPermitSendTopic(Set<String> noPermitSendTopic) {
this.noPermitSendTopic = noPermitSendTopic;
}
public Set<String> getPermitPullTopic() {
return permitPullTopic;
}
public void setPermitPullTopic(Set<String> permitPullTopic) {
this.permitPullTopic = permitPullTopic;
}
public Set<String> getNoPermitPullTopic() {
return noPermitPullTopic;
}
public void setNoPermitPullTopic(Set<String> noPermitPullTopic) {
this.noPermitPullTopic = noPermitPullTopic;
}
public boolean isSendMessage() {
return sendMessage;
}
public void setSendMessage(boolean sendMessage) {
this.sendMessage = sendMessage;
}
public boolean isSendMessageV2() {
return sendMessageV2;
}
public void setSendMessageV2(boolean sendMessageV2) {
this.sendMessageV2 = sendMessageV2;
}
public boolean isSendBatchMessage() {
return sendBatchMessage;
}
public void setSendBatchMessage(boolean sendBatchMessage) {
this.sendBatchMessage = sendBatchMessage;
}
public boolean isConsumerSendMsgBack() {
return consumerSendMsgBack;
}
public void setConsumerSendMsgBack(boolean consumerSendMsgBack) {
this.consumerSendMsgBack = consumerSendMsgBack;
}
public boolean isPullMessage() {
return pullMessage;
}
public void setPullMessage(boolean pullMessage) {
this.pullMessage = pullMessage;
}
public boolean isQueryMessage() {
return queryMessage;
}
public void setQueryMessage(boolean queryMessage) {
this.queryMessage = queryMessage;
}
public boolean isViewMessageById() {
return viewMessageById;
}
public void setViewMessageById(boolean viewMessageById) {
this.viewMessageById = viewMessageById;
}
public boolean isHeartBeat() {
return heartBeat;
}
public void setHeartBeat(boolean heartBeat) {
this.heartBeat = heartBeat;
}
public boolean isUnregisterClient() {
return unregisterClient;
}
public void setUnregisterClient(boolean unregisterClient) {
this.unregisterClient = unregisterClient;
}
public boolean isCheckClientConfig() {
return checkClientConfig;
}
public void setCheckClientConfig(boolean checkClientConfig) {
this.checkClientConfig = checkClientConfig;
}
public boolean isGetConsumerListByGroup() {
return getConsumerListByGroup;
}
public void setGetConsumerListByGroup(boolean getConsumerListByGroup) {
this.getConsumerListByGroup = getConsumerListByGroup;
}
public boolean isUpdateConsumerOffset() {
return updateConsumerOffset;
}
public void setUpdateConsumerOffset(boolean updateConsumerOffset) {
this.updateConsumerOffset = updateConsumerOffset;
}
public boolean isQueryConsumerOffset() {
return queryConsumerOffset;
}
public void setQueryConsumerOffset(boolean queryConsumerOffset) {
this.queryConsumerOffset = queryConsumerOffset;
}
public boolean isEndTransaction() {
return endTransaction;
}
public void setEndTransaction(boolean endTransaction) {
this.endTransaction = endTransaction;
}
public boolean isUpdateAndCreateTopic() {
return updateAndCreateTopic;
}
public void setUpdateAndCreateTopic(boolean updateAndCreateTopic) {
this.updateAndCreateTopic = updateAndCreateTopic;
}
public boolean isDeleteTopicInbroker() {
return deleteTopicInbroker;
}
public void setDeleteTopicInbroker(boolean deleteTopicInbroker) {
this.deleteTopicInbroker = deleteTopicInbroker;
}
public boolean isGetAllTopicConfig() {
return getAllTopicConfig;
}
public void setGetAllTopicConfig(boolean getAllTopicConfig) {
this.getAllTopicConfig = getAllTopicConfig;
}
public boolean isUpdateBrokerConfig() {
return updateBrokerConfig;
}
public void setUpdateBrokerConfig(boolean updateBrokerConfig) {
this.updateBrokerConfig = updateBrokerConfig;
}
public boolean isGetBrokerConfig() {
return getBrokerConfig;
}
public void setGetBrokerConfig(boolean getBrokerConfig) {
this.getBrokerConfig = getBrokerConfig;
}
public boolean isSearchOffsetByTimestamp() {
return searchOffsetByTimestamp;
}
public void setSearchOffsetByTimestamp(boolean searchOffsetByTimestamp) {
this.searchOffsetByTimestamp = searchOffsetByTimestamp;
}
public boolean isGetMaxOffset() {
return getMaxOffset;
}
public void setGetMaxOffset(boolean getMaxOffset) {
this.getMaxOffset = getMaxOffset;
}
public boolean isGetMixOffset() {
return getMixOffset;
}
public void setGetMixOffset(boolean getMixOffset) {
this.getMixOffset = getMixOffset;
}
public boolean isGetEarliestMsgStoretime() {
return getEarliestMsgStoretime;
}
public void setGetEarliestMsgStoretime(boolean getEarliestMsgStoretime) {
this.getEarliestMsgStoretime = getEarliestMsgStoretime;
}
public boolean isGetBrokerRuntimeInfo() {
return getBrokerRuntimeInfo;
}
public void setGetBrokerRuntimeInfo(boolean getBrokerRuntimeInfo) {
this.getBrokerRuntimeInfo = getBrokerRuntimeInfo;
}
public boolean isLockBatchMQ() {
return lockBatchMQ;
}
public void setLockBatchMQ(boolean lockBatchMQ) {
this.lockBatchMQ = lockBatchMQ;
}
public boolean isUnlockBatchMQ() {
return unlockBatchMQ;
}
public void setUnlockBatchMQ(boolean unlockBatchMQ) {
this.unlockBatchMQ = unlockBatchMQ;
}
public boolean isUpdateAndCreteSubscriptiongroup() {
return updateAndCreteSubscriptiongroup;
}
public void setUpdateAndCreteSubscriptiongroup(boolean updateAndCreteSubscriptiongroup) {
this.updateAndCreteSubscriptiongroup = updateAndCreteSubscriptiongroup;
}
public boolean isGetAllSubscriptiongroupConfig() {
return getAllSubscriptiongroupConfig;
}
public void setGetAllSubscriptiongroupConfig(boolean getAllSubscriptiongroupConfig) {
this.getAllSubscriptiongroupConfig = getAllSubscriptiongroupConfig;
}
public boolean isDeleteSubscriptiongroup() {
return deleteSubscriptiongroup;
}
public void setDeleteSubscriptiongroup(boolean deleteSubscriptiongroup) {
this.deleteSubscriptiongroup = deleteSubscriptiongroup;
}
public boolean isGetTopicStatsInfo() {
return getTopicStatsInfo;
}
public void setGetTopicStatsInfo(boolean getTopicStatsInfo) {
this.getTopicStatsInfo = getTopicStatsInfo;
}
public boolean isGetConsumerConnectionList() {
return getConsumerConnectionList;
}
public void setGetConsumerConnectionList(boolean getConsumerConnectionList) {
this.getConsumerConnectionList = getConsumerConnectionList;
}
public boolean isGetProducerConnectionList() {
return getProducerConnectionList;
}
public void setGetProducerConnectionList(boolean getProducerConnectionList) {
this.getProducerConnectionList = getProducerConnectionList;
}
public boolean isGetConsumeStats() {
return getConsumeStats;
}
public void setGetConsumeStats(boolean getConsumeStats) {
this.getConsumeStats = getConsumeStats;
}
public boolean isGetAllConsumerOffset() {
return getAllConsumerOffset;
}
public void setGetAllConsumerOffset(boolean getAllConsumerOffset) {
this.getAllConsumerOffset = getAllConsumerOffset;
}
public boolean isGetAllDelayOffset() {
return getAllDelayOffset;
}
public void setGetAllDelayOffset(boolean getAllDelayOffset) {
this.getAllDelayOffset = getAllDelayOffset;
}
public boolean isInvokeBrokerToresetOffset() {
return invokeBrokerToresetOffset;
}
public void setInvokeBrokerToresetOffset(boolean invokeBrokerToresetOffset) {
this.invokeBrokerToresetOffset = invokeBrokerToresetOffset;
}
public boolean isQueryTopicConsumByWho() {
return queryTopicConsumByWho;
}
public void setQueryTopicConsumByWho(boolean queryTopicConsumByWho) {
this.queryTopicConsumByWho = queryTopicConsumByWho;
}
public boolean isRegisterFilterServer() {
return registerFilterServer;
}
public void setRegisterFilterServer(boolean registerFilterServer) {
this.registerFilterServer = registerFilterServer;
}
public boolean isQueryConsumeTimeSpan() {
return queryConsumeTimeSpan;
}
public void setQueryConsumeTimeSpan(boolean queryConsumeTimeSpan) {
this.queryConsumeTimeSpan = queryConsumeTimeSpan;
}
public boolean isGetSystemTopicListFromBroker() {
return getSystemTopicListFromBroker;
}
public void setGetSystemTopicListFromBroker(boolean getSystemTopicListFromBroker) {
this.getSystemTopicListFromBroker = getSystemTopicListFromBroker;
}
public boolean isCleanExpiredConsumequeue() {
return cleanExpiredConsumequeue;
}
public void setCleanExpiredConsumequeue(boolean cleanExpiredConsumequeue) {
this.cleanExpiredConsumequeue = cleanExpiredConsumequeue;
}
public boolean isCleanUnusedTopic() {
return cleanUnusedTopic;
}
public void setCleanUnusedTopic(boolean cleanUnusedTopic) {
this.cleanUnusedTopic = cleanUnusedTopic;
}
public boolean isGetConsumerRunningInfo() {
return getConsumerRunningInfo;
}
public void setGetConsumerRunningInfo(boolean getConsumerRunningInfo) {
this.getConsumerRunningInfo = getConsumerRunningInfo;
}
public boolean isQueryCorrectionOffset() {
return queryCorrectionOffset;
}
public void setQueryCorrectionOffset(boolean queryCorrectionOffset) {
this.queryCorrectionOffset = queryCorrectionOffset;
}
public boolean isConsumeMessageDirectly() {
return consumeMessageDirectly;
}
public void setConsumeMessageDirectly(boolean consumeMessageDirectly) {
this.consumeMessageDirectly = consumeMessageDirectly;
}
public boolean isCloneGroupOffset() {
return cloneGroupOffset;
}
public void setCloneGroupOffset(boolean cloneGroupOffset) {
this.cloneGroupOffset = cloneGroupOffset;
}
public boolean isViewBrokerStatsData() {
return viewBrokerStatsData;
}
public void setViewBrokerStatsData(boolean viewBrokerStatsData) {
this.viewBrokerStatsData = viewBrokerStatsData;
}
public boolean isGetBrokerConsumeStats() {
return getBrokerConsumeStats;
}
public void setGetBrokerConsumeStats(boolean getBrokerConsumeStats) {
this.getBrokerConsumeStats = getBrokerConsumeStats;
}
public boolean isQueryConsumeQueue() {
return queryConsumeQueue;
}
public void setQueryConsumeQueue(boolean queryConsumeQueue) {
this.queryConsumeQueue = queryConsumeQueue;
}
@Override
public String toString() {
return "BorkerAccessControl [permitSendTopic=" + permitSendTopic + ", noPermitSendTopic=" + noPermitSendTopic
+ ", permitPullTopic=" + permitPullTopic + ", noPermitPullTopic=" + noPermitPullTopic + ", sendMessage="
+ sendMessage + ", sendMessageV2=" + sendMessageV2 + ", sendBatchMessage=" + sendBatchMessage
+ ", consumerSendMsgBack=" + consumerSendMsgBack + ", pullMessage=" + pullMessage + ", queryMessage="
+ queryMessage + ", viewMessageById=" + viewMessageById + ", heartBeat=" + heartBeat
+ ", unregisterClient=" + unregisterClient + ", checkClientConfig=" + checkClientConfig
+ ", getConsumerListByGroup=" + getConsumerListByGroup + ", updateConsumerOffset="
+ updateConsumerOffset + ", queryConsumerOffset=" + queryConsumerOffset + ", endTransaction="
+ endTransaction + ", updateAndCreateTopic=" + updateAndCreateTopic + ", deleteTopicInbroker="
+ deleteTopicInbroker + ", getAllTopicConfig=" + getAllTopicConfig + ", updateBrokerConfig="
+ updateBrokerConfig + ", getBrokerConfig=" + getBrokerConfig + ", searchOffsetByTimestamp="
+ searchOffsetByTimestamp + ", getMaxOffset=" + getMaxOffset + ", getMixOffset=" + getMixOffset
+ ", getEarliestMsgStoretime=" + getEarliestMsgStoretime + ", getBrokerRuntimeInfo="
+ getBrokerRuntimeInfo + ", lockBatchMQ=" + lockBatchMQ + ", unlockBatchMQ=" + unlockBatchMQ
+ ", updateAndCreteSubscriptiongroup=" + updateAndCreteSubscriptiongroup
+ ", getAllSubscriptiongroupConfig=" + getAllSubscriptiongroupConfig + ", deleteSubscriptiongroup="
+ deleteSubscriptiongroup + ", getTopicStatsInfo=" + getTopicStatsInfo + ", getConsumerConnectionList="
+ getConsumerConnectionList + ", getProducerConnectionList=" + getProducerConnectionList
+ ", getConsumeStats=" + getConsumeStats + ", getAllConsumerOffset=" + getAllConsumerOffset
+ ", getAllDelayOffset=" + getAllDelayOffset + ", invokeBrokerToresetOffset="
+ invokeBrokerToresetOffset + ", queryTopicConsumByWho=" + queryTopicConsumByWho
+ ", registerFilterServer=" + registerFilterServer + ", queryConsumeTimeSpan=" + queryConsumeTimeSpan
+ ", getSystemTopicListFromBroker=" + getSystemTopicListFromBroker + ", cleanExpiredConsumequeue="
+ cleanExpiredConsumequeue + ", cleanUnusedTopic=" + cleanUnusedTopic + ", getConsumerRunningInfo="
+ getConsumerRunningInfo + ", queryCorrectionOffset=" + queryCorrectionOffset
+ ", consumeMessageDirectly=" + consumeMessageDirectly + ", cloneGroupOffset=" + cloneGroupOffset
+ ", viewBrokerStatsData=" + viewBrokerStatsData + ", getBrokerConsumeStats=" + getBrokerConsumeStats
+ ", queryConsumeQueue=" + queryConsumeQueue + ", toString()=" + super.toString() + "]";
}
}
package org.apache.rocketmq.acl.plug.entity;
import java.util.List;
public class BorkerAccessControlTransport {
private BorkerAccessControl onlyNetAddress;
private List<BorkerAccessControl> list;
public BorkerAccessControlTransport() {
super();
}
public BorkerAccessControl getOnlyNetAddress() {
return onlyNetAddress;
}
public void setOnlyNetAddress(BorkerAccessControl onlyNetAddress) {
this.onlyNetAddress = onlyNetAddress;
}
public List<BorkerAccessControl> getList() {
return list;
}
public void setList(List<BorkerAccessControl> list) {
this.list = list;
}
@Override
public String toString() {
return "BorkerAccessControlTransport [onlyNetAddress=" + onlyNetAddress + ", list=" + list + "]";
}
}
package org.apache.rocketmq.acl.plug.entity;
public class ControllerParametersEntity {
}
package org.apache.rocketmq.acl.plug.entity;
public class LoginInfo {
private String recognition;
private long loginTime = System.currentTimeMillis();
private long operationTime = loginTime;
private AuthenticationInfo authenticationInfo;
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
}
public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
this.authenticationInfo = authenticationInfo;
}
public String getRecognition() {
return recognition;
}
public void setRecognition(String recognition) {
this.recognition = recognition;
}
public long getLoginTime() {
return loginTime;
}
public void setLoginTime(long loginTime) {
this.loginTime = loginTime;
}
public long getOperationTime() {
return operationTime;
}
public void setOperationTime(long operationTime) {
this.operationTime = operationTime;
}
@Override
public String toString() {
return "LoginInfo [recognition=" + recognition + ", loginTime=" + loginTime + ", operationTime=" + operationTime
+ ", authenticationInfo=" + authenticationInfo + "]";
}
}
package org.apache.rocketmq.acl.plug.entity;
/**
* @author Administrator
*
*/
public class LoginOrRequestAccessControl extends AccessControl {
private int code;
private String topic;
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("LoginOrRequestAccessControl [code=").append(code).append(", topic=").append(topic).append("]");
return builder.toString();
}
}
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.AclUtils;
public abstract class AbstractNetaddressStrategy implements NetaddressStrategy {
public void verify(String netaddress , int index) {
AclUtils.isScope(netaddress, index);
}
}
package org.apache.rocketmq.acl.plug.strategy;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public class MultipleNetaddressStrategy extends AbstractNetaddressStrategy {
private final Set<String> multipleSet = new HashSet<>();
public MultipleNetaddressStrategy(String[] strArray) {
for(String netaddress : strArray) {
verify(netaddress, 4);
multipleSet.add(netaddress);
}
}
@Override
public boolean match(AccessControl accessControl) {
return multipleSet.contains(accessControl.getNetaddress());
}
}
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public interface NetaddressStrategy {
public boolean match(AccessControl accessControl);
}
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public class NetaddressStrategyFactory {
public NetaddressStrategy getNetaddressStrategy(AccessControl accessControl ) {
String netaddress = accessControl.getNetaddress();
if(StringUtils.isBlank(netaddress) || "*".equals(netaddress) ) {//*
return NullNetaddressStrategy.NULL_NET_ADDRESS_STRATEGY;
}
if(netaddress.endsWith("}")) {//1.1.1.{1,2,3,4,5}
String[] strArray = StringUtils.split(netaddress);
String four = strArray[3];
if(!four.startsWith("{")) {
}
return new MultipleNetaddressStrategy(AclUtils.getAddreeStrArray(netaddress, four));
}else if(AclUtils.isColon(netaddress)) {//1.1.1.1,1.2.3.4.5
return new MultipleNetaddressStrategy( StringUtils.split(","));
}else if(AclUtils.isAsterisk(netaddress) || AclUtils.isMinus(netaddress)) {//1.2.*.* , 1.1.1.1-5 ,1.1.1-5.*
return new RangeNetaddressStrategy(netaddress);
}
return new OneNetaddressStrategy(netaddress);
}
}
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public class NullNetaddressStrategy implements NetaddressStrategy {
public static final NullNetaddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullNetaddressStrategy();
@Override
public boolean match(AccessControl accessControl) {
return true;
}
}
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public class OneNetaddressStrategy extends AbstractNetaddressStrategy {
private String netaddress;
public OneNetaddressStrategy(String netaddress) {
this.netaddress = netaddress;
}
@Override
public boolean match(AccessControl accessControl) {
return netaddress.equals(accessControl.getNetaddress());
}
}
package org.apache.rocketmq.acl.plug.strategy;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
public class RangeNetaddressStrategy extends AbstractNetaddressStrategy {
private String head;
private int start;
private int end;
private int index;
public RangeNetaddressStrategy(String netaddress) {
String[] strArray = StringUtils.split(netaddress , ".");
if( analysis(strArray , 2) ||analysis(strArray , 3) ) {
verify(netaddress, index);
StringBuffer sb = new StringBuffer().append( strArray[0].trim()).append(".").append( strArray[1].trim()).append(".");
if(index == 3) {
sb.append( strArray[2].trim()).append(".");
}
this.head = sb.toString();
}
}
private boolean analysis(String[] strArray , int index ) {
String value = strArray[index].trim();
this.index = index;
if( "*".equals( value) ){
setValue(0, 255);
}else if(AclUtils.isMinus( value )) {
String[] valueArray = StringUtils.split( value , "-" );
this.start = Integer.valueOf(valueArray[0]);
this.end = Integer.valueOf(valueArray[1]);
if ( !(AclUtils.isScope( end ) && AclUtils.isScope( start ) && start <= end)) {
}
}
return this.end > 0 ? true : false;
}
private void setValue(int start , int end) {
this.start = start ;
this.end = end;
}
@Override
public boolean match(AccessControl accessControl) {
String netAddress = accessControl.getNetaddress();
if ( netAddress.startsWith(this.head)) {
String value;
if(index == 3) {
value = netAddress.substring(this.head.length());
}else {
value = netAddress.substring(this.head.length() , netAddress.lastIndexOf('.'));
}
Integer address = Integer.valueOf(value);
if( address>= this.start && address <= this.end ) {
return true;
}
}
return false;
}
}
package org.apache.rocketmq.acl.plug;
import java.util.Map;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.junit.Test;
public class AccessContralAnalysisTest {
@Test
public void analysisTest() {
AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
Map<Integer, Boolean> map = accessContralAnalysis.analysis(new BorkerAccessControl());
System.out.println(map);
}
}
package org.apache.rocketmq.acl.plug.engine;
import org.junit.Test;
public class PlainAclPlugEngineTest {
@Test
public void testPlainAclPlugEngineInit() {
PlainAclPlugEngine plainAclPlugEngine = new PlainAclPlugEngine();
plainAclPlugEngine.init();
}
}
onlyNetAddress:
netaddress: 10.10.103.*
noPermitPullTopic:
- broker-a
list:
- account: laohu
password: 123456
netaddress: 192.0.0.*
permitSendTopic:
- test1
- test2
- account: laohu
password: 123456
netaddress: 192.0.2.1
permitSendTopic:
- test3
- test4
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -52,6 +48,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl-plug</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -31,6 +32,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclPlugController;
import org.apache.rocketmq.acl.plug.AclRemotingServer;
import org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -91,6 +97,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
......@@ -458,6 +465,7 @@ public class BrokerController {
}
}
initialTransaction();
initialAclPlug();
}
return result;
}
......@@ -477,6 +485,42 @@ public class BrokerController {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
private void initialAclPlug() {
try {
if(!this.brokerConfig.isAclPlug()) {
return;
}
AclPlugController aclPlugController = new AclPlugController(null);
if(!aclPlugController.isStartSucceed()) {
return;
}
final AclRemotingServer aclRemotingServe = aclPlugController.getAclRemotingServer();
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
HashMap<String, String> extFields = request.getExtFields();
LoginOrRequestAccessControl accessControl = new LoginOrRequestAccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
if( extFields != null ) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setNetaddress(StringUtils.split(remoteAddr,":")[0]);
accessControl.setTopic(extFields.get("topic"));
}
aclRemotingServe.eachCheck(accessControl);
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}
});
}catch(Exception e) {
}
}
public void registerProcessor() {
/**
* SendMessageProcessor
......
......@@ -164,6 +164,9 @@ public class BrokerConfig {
@ImportantField
private long transactionCheckInterval = 60 * 1000;
private boolean isAclPlug;
public boolean isTraceOn() {
return traceOn;
}
......@@ -701,4 +704,13 @@ public class BrokerConfig {
public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
public boolean isAclPlug() {
return isAclPlug;
}
public void setAclPlug(boolean isAclPlug) {
this.isAclPlug = isAclPlug;
}
}
......@@ -20,3 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
aclPlug=true
namesrvAddr=127.0.0.1:9876
......@@ -29,10 +29,10 @@ public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
......
......@@ -125,6 +125,7 @@
<module>distribution</module>
<module>openmessaging</module>
<module>logging</module>
<module>acl-plug</module>
</modules>
<build>
......@@ -214,9 +215,9 @@
<execution>
<id>generate-effective-dependencies-pom</id>
<phase>generate-resources</phase>
<goals>
<!-- <goals>
<goal>effective-pom</goal>
</goals>
</goals> -->
<configuration>
<output>${project.build.directory}/effective-pom/effective-dependencies.xml</output>
</configuration>
......@@ -535,6 +536,11 @@
<artifactId>rocketmq-example</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl-plug</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册