diff --git a/acl-plug/pom.xml b/acl-plug/pom.xml
index 762d7a1910049f7a71e8bd16a87616d20fcc014f..d91d4203400e4606f25aed61406cf431403dd2ce 100644
--- a/acl-plug/pom.xml
+++ b/acl-plug/pom.xml
@@ -19,7 +19,7 @@
4.4.0-SNAPSHOT
rocketmq-acl
- rocketmq-acl-plug ${project.version}
+ rocketmq-acl ${project.version}
http://maven.apache.org
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
index 8abf35a30b8fea9c35cb4b3880e7b51706c19f5c..eb657c0a1804c2a7d4fa7b09430b0295a4c140bb 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
@@ -61,10 +61,10 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
+ accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
- accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
accessControl.setTopic(extFields.get("topic"));
}
return accessControl;
@@ -72,13 +72,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access
@Override
public void validate(AccessResource accessResource) {
- AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
- if (authenticationResult.getException() != null) {
- throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
- }
- if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
- throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
- }
+ try {
+ AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
+ if (authenticationResult.getException() != null) {
+ throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
+ }
+ if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
+ throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
+ }
+ }catch(Exception e) {
+ throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()) , e);
+ }
}
}
diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml
index f86e68d411aa54fe42f62c9bd2734e334e81239a..f8180ede0273dbe1091c1a65214c1c7fa6ecbe67 100644
--- a/distribution/conf/transport.yml
+++ b/distribution/conf/transport.yml
@@ -14,20 +14,20 @@
# limitations under the License.
onlyNetAddress:
- netaddress: 127.0.0.*
+ netaddress: 192.168.0.*
noPermitPullTopic:
- broker-a
list:
- account: RocketMQ
password: 1234567
- netaddress: 192.0.0.*
+ netaddress: 192.168.0.*
permitSendTopic:
- - test1
+ - TopicTest
- test2
- account: RocketMQ
password: 1234567
- netaddress: 192.0.2.1
+ netaddress: 192.168.2.1
permitSendTopic:
- test3
- test4
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..df5e7b90be3e90b65c1a579e55d5e92df3e45617
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * 1. 把broker模块src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator 复制到src/java/resources/META-INF/service
+ * 2. 查看distribution模块下 /conf/transport.yml文件,注意里面的账户密码,ip
+ * 3. 把ALC_RCP_HOOK_ACCOUT与ACL_RCP_HOOK_PASSWORD 修改成transport.yml里面对应的账户密码
+ * @author laohu
+ *
+ */
+public class AclClient {
+
+ private static final Map OFFSE_TABLE = new HashMap();
+
+ private static String ALC_RCP_HOOK_ACCOUT = "RocketMQ";
+
+ private static String ACL_RCP_HOOK_PASSWORD = "1234567";
+
+
+
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ producer();
+ pushConsumer();
+ pullConsumer();
+ }
+
+ public static void producer() throws MQClientException {
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",getAalRPCHook());
+ producer.setNamesrvAddr("127.0.0.1:9876");
+ producer.start();
+
+ for (int i = 0; i < 128; i++)
+ try {
+ {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+
+ public static void pushConsumer() throws MQClientException {
+
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5" , getAalRPCHook(),new AllocateMessageQueueAveragely());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.subscribe("TopicTest", "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ //wrong time format 2017_0422_221800
+ consumer.setConsumeTimestamp("20180422221800");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ printBody(msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+
+ public static void pullConsumer() throws MQClientException {
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6" , getAalRPCHook());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.start();
+
+ Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
+ for (MessageQueue mq : mqs) {
+ System.out.printf("Consume from the queue: %s%n", mq);
+ SINGLE_MQ:
+ while (true) {
+ try {
+ PullResult pullResult =
+ consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+ System.out.printf("%s%n", pullResult);
+ putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+ printBody(pullResult);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ break;
+ case NO_MATCHED_MSG:
+ break;
+ case NO_NEW_MSG:
+ break SINGLE_MQ;
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ consumer.shutdown();
+ }
+
+ private static void printBody(PullResult pullResult) {
+ printBody(pullResult.getMsgFoundList());
+ }
+
+ private static void printBody(List msg) {
+ if(msg == null || msg.size() == 0)
+ return;
+ for(MessageExt m : msg) {
+ if(m != null) {
+ System.out.printf("msgId : %s body : %s",m.getMsgId() , new String(m.getBody()));
+ System.out.println();
+ }
+ }
+ }
+
+ private static long getMessageQueueOffset(MessageQueue mq) {
+ Long offset = OFFSE_TABLE.get(mq);
+ if (offset != null)
+ return offset;
+
+ return 0;
+ }
+
+ private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+ OFFSE_TABLE.put(mq, offset);
+ }
+
+ static RPCHook getAalRPCHook() {
+ return new AalRPCHook(ALC_RCP_HOOK_ACCOUT, ACL_RCP_HOOK_PASSWORD);
+ }
+
+
+ static class AalRPCHook implements RPCHook{
+
+ private String account;
+
+ private String password;
+
+ public AalRPCHook(String account , String password) {
+ this.account = account;
+ this.password = password;
+ }
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+
+ HashMap ext = request.getExtFields();
+ if(ext == null) {
+ ext = new HashMap<>();
+ request.setExtFields(ext);
+ }
+ ext.put("account", this.account);
+ ext.put("password", this.password);
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}