From 94403714ee1ca4c9640171c28bc6c275e0525729 Mon Sep 17 00:00:00 2001 From: laohu <8wy118611@163.com> Date: Thu, 8 Nov 2018 13:03:03 +0800 Subject: [PATCH] add acl use example. AclClient.java --- acl-plug/pom.xml | 2 +- .../plug/DefaultAclRemotingServiceImpl.java | 20 +- distribution/conf/transport.yml | 8 +- .../rocketmq/example/simple/AclClient.java | 209 ++++++++++++++++++ 4 files changed, 226 insertions(+), 13 deletions(-) create mode 100644 example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java diff --git a/acl-plug/pom.xml b/acl-plug/pom.xml index 762d7a19..d91d4203 100644 --- a/acl-plug/pom.xml +++ b/acl-plug/pom.xml @@ -19,7 +19,7 @@ 4.4.0-SNAPSHOT rocketmq-acl - rocketmq-acl-plug ${project.version} + rocketmq-acl ${project.version} http://maven.apache.org diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java index 8abf35a3..eb657c0a 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java @@ -61,10 +61,10 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access AccessControl accessControl = new AccessControl(); accessControl.setCode(request.getCode()); accessControl.setRecognition(remoteAddr); + accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]); if (extFields != null) { accessControl.setAccount(extFields.get("account")); accessControl.setPassword(extFields.get("password")); - accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]); accessControl.setTopic(extFields.get("topic")); } return accessControl; @@ -72,13 +72,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access @Override public void validate(AccessResource accessResource) { - AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); - if (authenticationResult.getException() != null) { - throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); - } - if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { - throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); - } + try { + AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); + if (authenticationResult.getException() != null) { + throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); + } + if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { + throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); + } + }catch(Exception e) { + throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()) , e); + } } } diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml index f86e68d4..f8180ede 100644 --- a/distribution/conf/transport.yml +++ b/distribution/conf/transport.yml @@ -14,20 +14,20 @@ # limitations under the License. onlyNetAddress: - netaddress: 127.0.0.* + netaddress: 192.168.0.* noPermitPullTopic: - broker-a list: - account: RocketMQ password: 1234567 - netaddress: 192.0.0.* + netaddress: 192.168.0.* permitSendTopic: - - test1 + - TopicTest - test2 - account: RocketMQ password: 1234567 - netaddress: 192.0.2.1 + netaddress: 192.168.2.1 permitSendTopic: - test3 - test4 diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java new file mode 100644 index 00000000..df5e7b90 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +/** + * 1. 把broker模块src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator 复制到src/java/resources/META-INF/service + * 2. 查看distribution模块下 /conf/transport.yml文件,注意里面的账户密码,ip + * 3. 把ALC_RCP_HOOK_ACCOUT与ACL_RCP_HOOK_PASSWORD 修改成transport.yml里面对应的账户密码 + * @author laohu + * + */ +public class AclClient { + + private static final Map OFFSE_TABLE = new HashMap(); + + private static String ALC_RCP_HOOK_ACCOUT = "RocketMQ"; + + private static String ACL_RCP_HOOK_PASSWORD = "1234567"; + + + + public static void main(String[] args) throws MQClientException, InterruptedException { + producer(); + pushConsumer(); + pullConsumer(); + } + + public static void producer() throws MQClientException { + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",getAalRPCHook()); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + for (int i = 0; i < 128; i++) + try { + { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + producer.shutdown(); + } + + public static void pushConsumer() throws MQClientException { + + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5" , getAalRPCHook(),new AllocateMessageQueueAveragely()); + consumer.setNamesrvAddr("127.0.0.1:9876"); + consumer.subscribe("TopicTest", "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + //wrong time format 2017_0422_221800 + consumer.setConsumeTimestamp("20180422221800"); + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); + printBody(msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } + + public static void pullConsumer() throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6" , getAalRPCHook()); + consumer.setNamesrvAddr("127.0.0.1:9876"); + consumer.start(); + + Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); + for (MessageQueue mq : mqs) { + System.out.printf("Consume from the queue: %s%n", mq); + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); + System.out.printf("%s%n", pullResult); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + printBody(pullResult); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + consumer.shutdown(); + } + + private static void printBody(PullResult pullResult) { + printBody(pullResult.getMsgFoundList()); + } + + private static void printBody(List msg) { + if(msg == null || msg.size() == 0) + return; + for(MessageExt m : msg) { + if(m != null) { + System.out.printf("msgId : %s body : %s",m.getMsgId() , new String(m.getBody())); + System.out.println(); + } + } + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) + return offset; + + return 0; + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } + + static RPCHook getAalRPCHook() { + return new AalRPCHook(ALC_RCP_HOOK_ACCOUT, ACL_RCP_HOOK_PASSWORD); + } + + + static class AalRPCHook implements RPCHook{ + + private String account; + + private String password; + + public AalRPCHook(String account , String password) { + this.account = account; + this.password = password; + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + + HashMap ext = request.getExtFields(); + if(ext == null) { + ext = new HashMap<>(); + request.setExtFields(ext); + } + ext.put("account", this.account); + ext.put("password", this.password); + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + // TODO Auto-generated method stub + + } + + } +} -- GitLab