diff --git a/acl-plug/pom.xml b/acl-plug/pom.xml
index 1cdc4a29da51b1a46e8385cb59a733f8a73e12b3..d91d4203400e4606f25aed61406cf431403dd2ce 100644
--- a/acl-plug/pom.xml
+++ b/acl-plug/pom.xml
@@ -18,8 +18,8 @@
rocketmq-all
4.4.0-SNAPSHOT
- rocketmq-acl-plug
- rocketmq-acl-plug ${project.version}
+ rocketmq-acl
+ rocketmq-acl ${project.version}
http://maven.apache.org
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
index d573e56cf1fcfd59d68599a61e9523019fd0c98c..0b1b0823c508cf01bd43a3ad0ec5b9e230e5ac05 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
@@ -22,14 +22,16 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator {
/**
* Parse to get the AccessResource(user, resource, needed permission)
+ *
* @param request
* @return
*/
- AccessResource parse(RemotingCommand request);
+ AccessResource parse(RemotingCommand request, String remoteAddr);
/**
* Validate the access resource.
+ *
* @param accessResource
*/
- void validate(AccessResource accessResource) ;
+ void validate(AccessResource accessResource);
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
index 859cc809243b71d9fc4e5fc6cafafa387929f918..704ace47b7841ba8e0199a17d34b991f15be9f64 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
@@ -21,11 +21,13 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultAccessValidator implements AccessValidator {
- @Override public AccessResource parse(RemotingCommand request) {
+ @Override
+ public AccessResource parse(RemotingCommand request, String remoteAddr) {
return null;
}
- @Override public void validate(AccessResource accessResource) {
+ @Override
+ public void validate(AccessResource accessResource) {
}
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
index 75c907d82b4b7485d011145c8b8f3450d4545be9..1adf6d432eaeaa49872608eba9b23f9b560aea42 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.acl.plug;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.acl.plug.entity.AccessControl;
-import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AccessContralAnalysis {
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
index 901cc409d7882ed47ab9397e90c419ee85748e65..ae247e7220e43c495e9bc63e73df0ad852e1dd49 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
public class Authentication {
public boolean authentication(AuthenticationInfo authenticationInfo,
- AccessControl accessControl, AuthenticationResult authenticationResult) {
+ AccessControl accessControl, AuthenticationResult authenticationResult) {
int code = accessControl.getCode();
if (!authenticationInfo.getAuthority().get(code)) {
authenticationResult.setResultString(String.format("code is %d Authentication failed", code));
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
index b42205abcf078011adf6eb7a0eb20aa4f644d038..0d5f949c987fc8e5fac08c8b44de6922fd0586c0 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
@@ -16,15 +16,28 @@
*/
package org.apache.rocketmq.acl.plug;
+import java.util.HashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
+import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class DefaultAclRemotingServiceImpl implements AclRemotingService {
+public class DefaultAclRemotingServiceImpl implements AclRemotingService, AccessValidator {
private AclPlugEngine aclPlugEngine;
+ public DefaultAclRemotingServiceImpl() {
+ ControllerParameters controllerParameters = new ControllerParameters();
+ this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
+ this.aclPlugEngine.initialize();
+ }
+
public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) {
this.aclPlugEngine = aclPlugEngine;
}
@@ -41,4 +54,34 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService {
return authenticationResult;
}
+ @Override
+ public AccessResource parse(RemotingCommand request, String remoteAddr) {
+ HashMap extFields = request.getExtFields();
+ AccessControl accessControl = new AccessControl();
+ accessControl.setCode(request.getCode());
+ accessControl.setRecognition(remoteAddr);
+ accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
+ if (extFields != null) {
+ accessControl.setAccount(extFields.get("account"));
+ accessControl.setPassword(extFields.get("password"));
+ accessControl.setTopic(extFields.get("topic"));
+ }
+ return accessControl;
+ }
+
+ @Override
+ public void validate(AccessResource accessResource) {
+ try {
+ AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
+ if (authenticationResult.getException() != null) {
+ throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
+ }
+ if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
+ throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
+ }
+ } catch (Exception e) {
+ throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e);
+ }
+ }
+
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
index badae946c1ee59b70db9fab9c4edf69be4084855..d1572755eeb6e6bdc9cb2703997decf160409eb9 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
@@ -31,5 +31,7 @@ public interface AclPlugEngine {
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl);
+ public AuthenticationResult eachCheckAuthentication(AccessControl accessControl);
+
public void initialize();
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
index 6aac6bd438e0406abacc92c7034bee53d3430af8..a6399fc3e468198d86e6b1afb24036f90fc00530 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.acl.plug.engine;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.Authentication;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
@@ -37,7 +39,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
ControllerParameters controllerParameters;
- private Map> accessControlMap = new HashMap<>();
+ private Map> accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
@@ -54,16 +56,16 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
}
try {
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
- Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
- if (accessControlAddressMap == null) {
- accessControlAddressMap = new HashMap<>();
- accessControlMap.put(accessControl.getAccount(), accessControlAddressMap);
+ List accessControlAddressList = accessControlMap.get(accessControl.getAccount());
+ if (accessControlAddressList == null) {
+ accessControlAddressList = new ArrayList<>();
+ accessControlMap.put(accessControl.getAccount(), accessControlAddressList);
}
AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
- accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo);
+ accessControlAddressList.add(authenticationInfo);
log.info("authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) {
- throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e);
+ throw new AclPlugRuntimeException(String.format("Exception info %s %s", e.getMessage(), accessControl.toString()), e);
}
}
@@ -84,24 +86,19 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
}
public AuthenticationInfo getAccessControl(AccessControl accessControl) {
- AuthenticationInfo existing = null;
if (accessControl.getAccount() == null && authenticationInfo != null) {
- existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
+ return authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
} else {
- Map accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
- if (accessControlAddressMap != null) {
- existing = accessControlAddressMap.get(accessControl.getNetaddress());
- if (existing == null)
- return null;
- if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) {
- if (existing.getNetaddressStrategy().match(accessControl)) {
- return existing;
+ List accessControlAddressList = accessControlMap.get(accessControl.getAccount());
+ if (accessControlAddressList != null) {
+ for (AuthenticationInfo ai : accessControlAddressList) {
+ if (ai.getNetaddressStrategy().match(accessControl) && ai.getAccessControl().getPassword().equals(accessControl.getPassword())) {
+ return ai;
}
}
- existing = null;
}
}
- return existing;
+ return null;
}
@Override
@@ -112,6 +109,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
+ authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
}
} catch (Exception e) {
authenticationResult.setException(e);
@@ -119,6 +117,21 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
return authenticationResult;
}
+ public AuthenticationResult eachCheckAuthentication(AccessControl accessControl) {
+ AuthenticationResult authenticationResult = new AuthenticationResult();
+ AuthenticationInfo authenticationInfo = getAccessControl(accessControl);
+ if (authenticationInfo != null) {
+ boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+ authenticationResult.setSucceed(boo);
+ authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
+ } else {
+ authenticationResult.setResultString("accessControl is null, Please check login, password, IP\"");
+ }
+
+
+ return authenticationResult;
+ }
+
void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty");
@@ -135,5 +148,5 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
}
protected abstract AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
- AuthenticationResult authenticationResult);
+ AuthenticationResult authenticationResult);
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
index e8dc59c422a2cc9e5d1821fc10e1f86462561d1f..35b568349ece1ac9979636a73b6ca569bcc53f14 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.acl.plug.engine;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
@@ -53,7 +54,7 @@ public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagemen
}
protected AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
- AuthenticationResult authenticationResult) {
+ AuthenticationResult authenticationResult) {
LoginInfo loginInfo = getLoginInfo(accessControl);
if (loginInfo != null && loginInfo.getAuthenticationInfo() != null) {
return loginInfo.getAuthenticationInfo();
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
index d1a7d9529b6a956b100454a0fd32bb074cd77058..bcb89b8fa288c463d190457f95882edb04a263a9 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
@@ -16,18 +16,19 @@
*/
package org.apache.rocketmq.acl.plug.engine;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.yaml.snakeyaml.Yaml;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
public PlainAclPlugEngine(
- ControllerParameters controllerParameters) throws AclPlugRuntimeException {
+ ControllerParameters controllerParameters) throws AclPlugRuntimeException {
super(controllerParameters);
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
index cf3a736a7f80f59c42145fd217491f632fbb00ce..092a97ef44bdfc2615b75d5a866c40010f5399ca 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.acl.plug.entity;
-public class AccessControl {
+import org.apache.rocketmq.acl.AccessResource;
+
+public class AccessControl implements AccessResource {
private String account;
@@ -85,8 +87,8 @@ public class AccessControl {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AccessControl [account=").append(account).append(", password=").append(password)
- .append(", netaddress=").append(netaddress).append(", recognition=").append(recognition)
- .append(", code=").append(code).append(", topic=").append(topic).append("]");
+ .append(", netaddress=").append(netaddress).append(", recognition=").append(recognition)
+ .append(", code=").append(code).append(", topic=").append(topic).append("]");
return builder.toString();
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
index 981bef855332e6fe8cbf7451636e901ce742f337..a1696e2e44c3d869e4c90570bbb0f2a9d6632288 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
@@ -16,10 +16,11 @@
*/
package org.apache.rocketmq.acl.plug.entity;
+import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
+
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
public class AuthenticationInfo {
@@ -30,7 +31,7 @@ public class AuthenticationInfo {
private Map authority;
public AuthenticationInfo(Map authority, AccessControl accessControl,
- NetaddressStrategy netaddressStrategy) {
+ NetaddressStrategy netaddressStrategy) {
super();
this.authority = authority;
this.accessControl = accessControl;
@@ -65,7 +66,7 @@ public class AuthenticationInfo {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticationInfo [accessControl=").append(accessControl).append(", netaddressStrategy=")
- .append(netaddressStrategy).append(", authority={");
+ .append(netaddressStrategy).append(", authority={");
Iterator> it = authority.entrySet().iterator();
while (it.hasNext()) {
Entry e = it.next();
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
index d40fadfacbabaf0e27c7690450f8dce358dd198c..b5eb1187d2859932df50e38caaac31de4efd8af0 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
@@ -556,8 +556,8 @@ public class BorkerAccessControl extends AccessControl {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("BorkerAccessControl [permitSendTopic=").append(permitSendTopic).append(", noPermitSendTopic=")
- .append(noPermitSendTopic).append(", permitPullTopic=").append(permitPullTopic)
- .append(", noPermitPullTopic=").append(noPermitPullTopic);
+ .append(noPermitSendTopic).append(", permitPullTopic=").append(permitPullTopic)
+ .append(", noPermitPullTopic=").append(noPermitPullTopic);
if (!!sendMessage)
builder.append(", sendMessage=").append(sendMessage);
if (!!sendMessageV2)
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
index 708bcbeb9153c194a2c95dca8d1aec16913a135f..94873b5fcf155983adc460272e63cc31571cf3cb 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
@@ -16,11 +16,12 @@
*/
package org.apache.rocketmq.acl.plug.entity;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode;
public class ControllerParameters {
- private String fileHome;
+ private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Class> accessContralAnalysisClass = RequestCode.class;
@@ -44,7 +45,7 @@ public class ControllerParameters {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ControllerParametersEntity [fileHome=").append(fileHome).append(", accessContralAnalysisClass=")
- .append(accessContralAnalysisClass).append("]");
+ .append(accessContralAnalysisClass).append("]");
return builder.toString();
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
index e08d7d38b1046b41a45b53b8ed391b63be511d5c..df1166be639764b9b9f682b6bb1b1a63ac7a9dfd 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
@@ -74,8 +74,8 @@ public class LoginInfo {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("LoginInfo [recognition=").append(recognition).append(", loginTime=").append(loginTime)
- .append(", operationTime=").append(operationTime).append(", clear=").append(clear)
- .append(", authenticationInfo=").append(authenticationInfo).append("]");
+ .append(", operationTime=").append(operationTime).append(", clear=").append(clear)
+ .append(", authenticationInfo=").append(authenticationInfo).append("]");
return builder.toString();
}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
index cdb78675e95198ecc2a6a10f996880ce04000329..4be995309168fd85018b6ca3b16580f597d82b74 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
@@ -16,13 +16,14 @@
*/
package org.apache.rocketmq.acl.plug.strategy;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import java.util.HashSet;
+import java.util.Set;
+
public class NetaddressStrategyFactory {
public static final NullNetaddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullNetaddressStrategy();
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..4830d6d75e114800d1d01a4a858fc9bda6c09f90
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import java.util.HashMap;
+
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;;
+
+public class AclRemotingServiceTest {
+
+
+ AclRemotingService defaultAclService;
+
+ AccessValidator accessValidator;
+
+ AccessControl accessControl;
+
+ AccessControl accessControlTwo;
+
+ @Before
+ public void init() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ DefaultAclRemotingServiceImpl aclRemotingServiceImpl = new DefaultAclRemotingServiceImpl();
+ defaultAclService = aclRemotingServiceImpl;
+ accessValidator = aclRemotingServiceImpl;
+
+ accessControl = new BorkerAccessControl();
+ accessControl.setAccount("RocketMQ");
+ accessControl.setPassword("1234567");
+ accessControl.setNetaddress("192.0.0.1");
+ accessControl.setRecognition("127.0.0.1:1");
+
+ accessControlTwo = new BorkerAccessControl();
+ accessControlTwo.setAccount("RocketMQ");
+ accessControlTwo.setPassword("1234567");
+ accessControlTwo.setNetaddress("192.0.2.1");
+ accessControlTwo.setRecognition("127.0.0.1:2");
+ }
+
+
+ @Test
+ public void defaultConstructorTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ AclRemotingService defaultAclService = new DefaultAclRemotingServiceImpl();
+ Assert.assertNotNull(defaultAclService);
+ }
+
+ @Test
+ public void parseTest() {
+ RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(34, "");
+ HashMap map = new HashMap<>();
+ map.put("account", "RocketMQ");
+ map.put("password", "123456");
+ map.put("topic", "test");
+ remotingCommand.setExtFields(map);
+
+ AccessResource accessResource = accessValidator.parse(remotingCommand, "127.0.0.1:123");
+ AccessControl accessControl = (AccessControl) accessResource;
+ AccessControl newAccessControl = new AccessControl();
+ newAccessControl.setAccount("RocketMQ");
+ newAccessControl.setPassword("123456");
+ newAccessControl.setTopic("test");
+ newAccessControl.setCode(34);
+ newAccessControl.setNetaddress("127.0.0.1");
+ newAccessControl.setRecognition("127.0.0.1:123");
+ Assert.assertEquals(accessControl.toString(), newAccessControl.toString());
+ }
+
+ @Test
+ public void checkTest() {
+ accessControl.setCode(34);
+ AuthenticationResult authenticationResult = defaultAclService.check(accessControl);
+ Assert.assertTrue(authenticationResult.isSucceed());
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void checkAccessExceptionTest() {
+ accessControl.setCode(34);
+ accessControl.setAccount("Rocketmq");
+ defaultAclService.check(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void checkPasswordTest() {
+ accessControl.setCode(34);
+ accessControl.setPassword("123123123");
+ defaultAclService.check(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void checkCodeTest() {
+ accessControl.setCode(14434);
+ accessControl.setPassword("123123123");
+ defaultAclService.check(accessControl);
+ }
+
+
+ @Test
+ public void validateTest() {
+ accessControl.setCode(34);
+ accessValidator.validate(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void validateAccessExceptionTest() {
+ accessControl.setCode(34);
+ accessControl.setAccount("Rocketmq");
+ accessValidator.validate(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void validatePasswordTest() {
+ accessControl.setCode(34);
+ accessControl.setPassword("123123123");
+ accessValidator.validate(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void validateCodeTest() {
+ accessControl.setCode(14434);
+ accessControl.setPassword("123123123");
+ accessValidator.validate(accessControl);
+ }
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
index 806d18089497b135026ef237df31982994fc8e53..b0cc4daba10b0307b5bf099689a3be79a710f510 100644
--- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
@@ -18,13 +18,12 @@ package org.apache.rocketmq.acl.plug;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
-@RunWith(MockitoJUnitRunner.class)
+
public class AclUtilsTest {
@Test
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
index c925ef412dab6ed7d6310327002085a46d20d990..83004bc2c269f93ba774bbee184f625f2f54bbd6 100644
--- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
@@ -57,46 +57,14 @@ public class PlainAclPlugEngineTest {
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ ControllerParameters controllerParametersEntity = new ControllerParameters();
Yaml ymal = new Yaml();
- String home = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
- InputStream fis = null;
- if (home == null) {
- URL url = PlainAclPlugEngineTest.class.getResource("/");
- home = url.toString();
- home = home.substring(0, home.length() - 1).replace("file:/", "").replace("target/test-classes", "");
- home = home + "src/test/resources";
- if (!new File(home + "/conf/transport.yml").exists()) {
- home = "/home/travis/build/githublaohu/rocketmq/acl-plug/src/test/resources";
- }
- }
- String filePath = home + "/conf/transport.yml";
- try {
- fis = new FileInputStream(new File(filePath));
- transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
- }catch(Exception e) {
- AccessControl accessControl = new BorkerAccessControl();
- accessControl.setAccount("onlyNetAddress");
- accessControl.setPassword("aliyun11");
- accessControl.setNetaddress("127.0.0.1");
- accessControl.setRecognition("127.0.0.1:1");
-
- AccessControl accessControlTwo = new BorkerAccessControl();
- accessControlTwo.setAccount("listTransport");
- accessControlTwo.setPassword("aliyun1");
- accessControlTwo.setNetaddress("127.0.0.1");
- accessControlTwo.setRecognition("127.0.0.1:2");
- transport = new BorkerAccessControlTransport();
- transport.setOnlyNetAddress((BorkerAccessControl)accessControl);
-
- }
- ControllerParameters controllerParametersEntity = new ControllerParameters();
- controllerParametersEntity.setFileHome(null);
- try {
- plainAclPlugEngine = new PlainAclPlugEngine(controllerParametersEntity);
- plainAclPlugEngine.initialize();
- } catch (Exception e) {
+ transport = ymal.loadAs(new FileInputStream(new File(controllerParametersEntity.getFileHome()+"/conf/transport.yml")), BorkerAccessControlTransport.class);
+
+ plainAclPlugEngine = new PlainAclPlugEngine(controllerParametersEntity);
+ plainAclPlugEngine.initialize();
- }
accessControl = new BorkerAccessControl();
accessControl.setAccount("rokcetmq");
@@ -142,6 +110,7 @@ public class PlainAclPlugEngineTest {
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
ControllerParameters controllerParametersEntity = new ControllerParameters();
+ controllerParametersEntity.setFileHome("");
new PlainAclPlugEngine(controllerParametersEntity).initialize();
}
diff --git a/broker/pom.xml b/broker/pom.xml
index c353eb32b830cbc7be701d52537462f370ea8a33..f617d2492d11c22612a535bd3f9abcd9d744c313 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -50,7 +50,7 @@
${project.groupId}
- rocketmq-acl-plug
+ rocketmq-acl
ch.qos.logback
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7a4c105e79fcfbc87cb30a3c39bc0a587a02ecb8..a6da44b641dcb57ffff36544c5218f4bbd5daf92 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
-import org.apache.rocketmq.acl.plug.AclPlugController;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -160,7 +159,6 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
- private AclPlugController aclPlugController;
public BrokerController(
final BrokerConfig brokerConfig,
@@ -510,7 +508,7 @@ public class BrokerController {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
- validator.validate(validator.parse(request));
+ validator.validate(validator.parse(request, remoteAddr));
}
@Override
@@ -1095,9 +1093,6 @@ public class BrokerController {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
- public AclPlugController getAclPlugController() {
- return this.aclPlugController;
- }
public BlockingQueue getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index f4ecc2c046beeb35e8e78435e275efe716c27af6..d536db5055eb2c2b7b6d8b5f4c80af743dd81a50 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -72,9 +72,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
- if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) {
- this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr);
- }
}
@Override
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
index 22228a6e0ef2cd9231b93a7cba8ca4f1a57cd2ee..a3a35c8832dd4222a6b3d338e46b9879c4e7b1e1 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
@@ -17,12 +17,15 @@
package org.apache.rocketmq.broker.util;
+import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
+
public class ServiceProviderTest {
@Test
@@ -38,4 +41,10 @@ public class ServiceProviderTest {
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
+
+ @Test
+ public void loadAccessValidatorTest() {
+ List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
+ assertThat(accessValidators).isNotNull();
+ }
}
diff --git a/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
new file mode 100644
index 0000000000000000000000000000000000000000..2f26220e5e106536a6b36e32cbf534e874e259fd
--- /dev/null
+++ b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
@@ -0,0 +1 @@
+org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl
\ No newline at end of file
diff --git a/distribution/conf/broker.conf b/distribution/conf/broker.conf
index 363bcbc03a646d0f3a3380b155ae15ec284c5e68..970395735d7d9a68b796c72dc92c9acfbf40ffae 100644
--- a/distribution/conf/broker.conf
+++ b/distribution/conf/broker.conf
@@ -20,5 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
-aclPlug=true
+enableAcl=true
namesrvAddr=127.0.0.1:9876
diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml
index 25d4902a67f9dc5d6c0d8e0b4d543f7350c23013..f8180ede0273dbe1091c1a65214c1c7fa6ecbe67 100644
--- a/distribution/conf/transport.yml
+++ b/distribution/conf/transport.yml
@@ -14,21 +14,21 @@
# limitations under the License.
onlyNetAddress:
- netaddress: 10.10.103.*
+ netaddress: 192.168.0.*
noPermitPullTopic:
- broker-a
list:
- account: RocketMQ
password: 1234567
- netaddress: 192.0.0.*
+ netaddress: 192.168.0.*
permitSendTopic:
- - test1
+ - TopicTest
- test2
- account: RocketMQ
password: 1234567
- netaddress: 192.0.2.1
+ netaddress: 192.168.2.1
permitSendTopic:
- test3
- test4
-
\ No newline at end of file
+
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..d696c91a9290699f72c2b7b69691618a6e4d4943
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ *
+ * English explain
+ * 1. broker module src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator copy to src/java/resources/META-INF/service.
+ *
+ * 2. view the /conf/transport.yml file under the distribution module, pay attention to the account password, IP.
+ *
+ * 3. Modify ALC_RCP_HOOK_ACCOUT and ACL_RCP_HOOK_PASSWORD to the corresponding account password in transport.yml
+ *
+ */
+public class AclClient {
+
+ private static final Map OFFSE_TABLE = new HashMap();
+
+ private static final String ACL_RCPHOOK_ACCOUT = "RocketMQ";
+
+ private static final String ACL_RCPHOOK_PASSWORD = "1234567";
+
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ producer();
+ pushConsumer();
+ pullConsumer();
+ }
+
+ public static void producer() throws MQClientException {
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAalRPCHook());
+ producer.setNamesrvAddr("127.0.0.1:9876");
+ producer.start();
+
+ for (int i = 0; i < 128; i++)
+ try {
+ {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+
+ public static void pushConsumer() throws MQClientException {
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAalRPCHook(), new AllocateMessageQueueAveragely());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.subscribe("TopicTest", "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ //wrong time format 2017_0422_221800
+ consumer.setConsumeTimestamp("20180422221800");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ printBody(msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+
+ public static void pullConsumer() throws MQClientException {
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAalRPCHook());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.start();
+
+ Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
+ for (MessageQueue mq : mqs) {
+ System.out.printf("Consume from the queue: %s%n", mq);
+ SINGLE_MQ:
+ while (true) {
+ try {
+ PullResult pullResult =
+ consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+ System.out.printf("%s%n", pullResult);
+ putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+ printBody(pullResult);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ break;
+ case NO_MATCHED_MSG:
+ break;
+ case NO_NEW_MSG:
+ break SINGLE_MQ;
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ consumer.shutdown();
+ }
+
+ private static void printBody(PullResult pullResult) {
+ printBody(pullResult.getMsgFoundList());
+ }
+
+ private static void printBody(List msg) {
+ if (msg == null || msg.size() == 0)
+ return;
+ for (MessageExt m : msg) {
+ if (m != null) {
+ System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
+ }
+ }
+ }
+
+ private static long getMessageQueueOffset(MessageQueue mq) {
+ Long offset = OFFSE_TABLE.get(mq);
+ if (offset != null)
+ return offset;
+
+ return 0;
+ }
+
+ private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+ OFFSE_TABLE.put(mq, offset);
+ }
+
+ static RPCHook getAalRPCHook() {
+ return new AalRPCHook(ACL_RCPHOOK_ACCOUT, ACL_RCPHOOK_PASSWORD);
+ }
+
+ static class AalRPCHook implements RPCHook {
+
+ private String account;
+
+ private String password;
+
+ public AalRPCHook(String account, String password) {
+ this.account = account;
+ this.password = password;
+ }
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+
+ HashMap ext = request.getExtFields();
+ if (ext == null) {
+ ext = new HashMap<>();
+ request.setExtFields(ext);
+ }
+ ext.put("account", this.account);
+ ext.put("password", this.password);
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index 535893c21002260509dece08a43f157b98907970..4fe56a4bc2e976e26bd7a319b58958695d0e1e8b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -525,7 +525,7 @@
${project.groupId}
- rocketmq-acl-plug
+ rocketmq-acl
${project.version}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 90f51ff0557f406719c13ed30e33efd4f29c60b4..fc9df37c652dcaef17b95a77bf2bffdaf60c9ba7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -282,7 +282,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
- if (!rpcHooks.contains(rpcHook)) {
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index ec34a4be7a07f9d46018e4f474b48261411d48c2..c2f3ba48d0ee3468be458f80ad60fc265f812151 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -265,7 +265,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
- if (!rpcHooks.contains(rpcHook)) {
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 5027a3cce070397c1d49a51891ef6b9b16538fed..a05a55a06b342b468ff4fad284840cbf44101ed0 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -19,9 +19,13 @@ package org.apache.rocketmq.test.base;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -48,6 +52,7 @@ public class BaseConf {
private static Logger log = Logger.getLogger(BaseConf.class);
static {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);