diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java index eb657c0a1804c2a7d4fa7b09430b0295a4c140bb..0d5f949c987fc8e5fac08c8b44de6922fd0586c0 100644 --- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.acl.plug; +import java.util.HashMap; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessResource; import org.apache.rocketmq.acl.AccessValidator; @@ -27,8 +28,6 @@ import org.apache.rocketmq.acl.plug.entity.ControllerParameters; import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.util.HashMap; - public class DefaultAclRemotingServiceImpl implements AclRemotingService, AccessValidator { private AclPlugEngine aclPlugEngine; @@ -72,17 +71,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access @Override public void validate(AccessResource accessResource) { - try { - AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); - if (authenticationResult.getException() != null) { - throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); - } - if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { - throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); - } - }catch(Exception e) { - throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()) , e); - } + try { + AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); + if (authenticationResult.getException() != null) { + throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); + } + if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { + throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); + } + } catch (Exception e) { + throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e); + } } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java index df5e7b90be3e90b65c1a579e55d5e92df3e45617..d696c91a9290699f72c2b7b69691618a6e4d4943 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullResult; @@ -40,170 +39,168 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; /** - * 1. 把broker模块src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator 复制到src/java/resources/META-INF/service - * 2. 查看distribution模块下 /conf/transport.yml文件,注意里面的账户密码,ip - * 3. 把ALC_RCP_HOOK_ACCOUT与ACL_RCP_HOOK_PASSWORD 修改成transport.yml里面对应的账户密码 - * @author laohu + * + * English explain + * 1. broker module src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator copy to src/java/resources/META-INF/service. + * + * 2. view the /conf/transport.yml file under the distribution module, pay attention to the account password, IP. + * + * 3. Modify ALC_RCP_HOOK_ACCOUT and ACL_RCP_HOOK_PASSWORD to the corresponding account password in transport.yml * */ public class AclClient { - - private static final Map OFFSE_TABLE = new HashMap(); - - private static String ALC_RCP_HOOK_ACCOUT = "RocketMQ"; - - private static String ACL_RCP_HOOK_PASSWORD = "1234567"; - - - - public static void main(String[] args) throws MQClientException, InterruptedException { - producer(); - pushConsumer(); - pullConsumer(); - } - - public static void producer() throws MQClientException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",getAalRPCHook()); - producer.setNamesrvAddr("127.0.0.1:9876"); - producer.start(); - - for (int i = 0; i < 128; i++) - try { - { - Message msg = new Message("TopicTest", - "TagA", - "OrderID188", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.send(msg); - System.out.printf("%s%n", sendResult); - } - - } catch (Exception e) { - e.printStackTrace(); - } - - producer.shutdown(); - } - - public static void pushConsumer() throws MQClientException { - - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5" , getAalRPCHook(),new AllocateMessageQueueAveragely()); - consumer.setNamesrvAddr("127.0.0.1:9876"); - consumer.subscribe("TopicTest", "*"); - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - //wrong time format 2017_0422_221800 - consumer.setConsumeTimestamp("20180422221800"); - consumer.registerMessageListener(new MessageListenerConcurrently() { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); - printBody(msgs); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - consumer.start(); - System.out.printf("Consumer Started.%n"); - } - - public static void pullConsumer() throws MQClientException { - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6" , getAalRPCHook()); - consumer.setNamesrvAddr("127.0.0.1:9876"); - consumer.start(); - - Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); - for (MessageQueue mq : mqs) { - System.out.printf("Consume from the queue: %s%n", mq); - SINGLE_MQ: - while (true) { - try { - PullResult pullResult = - consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); - System.out.printf("%s%n", pullResult); - putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); - printBody(pullResult); - switch (pullResult.getPullStatus()) { - case FOUND: - break; - case NO_MATCHED_MSG: - break; - case NO_NEW_MSG: - break SINGLE_MQ; - case OFFSET_ILLEGAL: - break; - default: - break; - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - consumer.shutdown(); - } - - private static void printBody(PullResult pullResult) { - printBody(pullResult.getMsgFoundList()); - } - - private static void printBody(List msg) { - if(msg == null || msg.size() == 0) - return; - for(MessageExt m : msg) { - if(m != null) { - System.out.printf("msgId : %s body : %s",m.getMsgId() , new String(m.getBody())); - System.out.println(); - } - } - } - - private static long getMessageQueueOffset(MessageQueue mq) { - Long offset = OFFSE_TABLE.get(mq); - if (offset != null) - return offset; - - return 0; - } - - private static void putMessageQueueOffset(MessageQueue mq, long offset) { - OFFSE_TABLE.put(mq, offset); - } - - static RPCHook getAalRPCHook() { - return new AalRPCHook(ALC_RCP_HOOK_ACCOUT, ACL_RCP_HOOK_PASSWORD); - } - - - static class AalRPCHook implements RPCHook{ - - private String account; - - private String password; - - public AalRPCHook(String account , String password) { - this.account = account; - this.password = password; - } - - @Override - public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - - HashMap ext = request.getExtFields(); - if(ext == null) { - ext = new HashMap<>(); - request.setExtFields(ext); - } - ext.put("account", this.account); - ext.put("password", this.password); - } - - @Override - public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { - // TODO Auto-generated method stub - - } - - } + + private static final Map OFFSE_TABLE = new HashMap(); + + private static final String ACL_RCPHOOK_ACCOUT = "RocketMQ"; + + private static final String ACL_RCPHOOK_PASSWORD = "1234567"; + + public static void main(String[] args) throws MQClientException, InterruptedException { + producer(); + pushConsumer(); + pullConsumer(); + } + + public static void producer() throws MQClientException { + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAalRPCHook()); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + for (int i = 0; i < 128; i++) + try { + { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + producer.shutdown(); + } + + public static void pushConsumer() throws MQClientException { + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAalRPCHook(), new AllocateMessageQueueAveragely()); + consumer.setNamesrvAddr("127.0.0.1:9876"); + consumer.subscribe("TopicTest", "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + //wrong time format 2017_0422_221800 + consumer.setConsumeTimestamp("20180422221800"); + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); + printBody(msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } + + public static void pullConsumer() throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAalRPCHook()); + consumer.setNamesrvAddr("127.0.0.1:9876"); + consumer.start(); + + Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); + for (MessageQueue mq : mqs) { + System.out.printf("Consume from the queue: %s%n", mq); + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); + System.out.printf("%s%n", pullResult); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + printBody(pullResult); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + consumer.shutdown(); + } + + private static void printBody(PullResult pullResult) { + printBody(pullResult.getMsgFoundList()); + } + + private static void printBody(List msg) { + if (msg == null || msg.size() == 0) + return; + for (MessageExt m : msg) { + if (m != null) { + System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody())); + } + } + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) + return offset; + + return 0; + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } + + static RPCHook getAalRPCHook() { + return new AalRPCHook(ACL_RCPHOOK_ACCOUT, ACL_RCPHOOK_PASSWORD); + } + + static class AalRPCHook implements RPCHook { + + private String account; + + private String password; + + public AalRPCHook(String account, String password) { + this.account = account; + this.password = password; + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + + HashMap ext = request.getExtFields(); + if (ext == null) { + ext = new HashMap<>(); + request.setExtFields(ext); + } + ext.put("account", this.account); + ext.put("password", this.password); + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + // TODO Auto-generated method stub + + } + + } }