提交 94403714 编写于 作者: L laohu

add acl use example. AclClient.java

上级 d23d2f75
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
<version>4.4.0-SNAPSHOT</version> <version>4.4.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>rocketmq-acl</artifactId> <artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl-plug ${project.version}</name> <name>rocketmq-acl ${project.version}</name>
<url>http://maven.apache.org</url> <url>http://maven.apache.org</url>
<properties> <properties>
......
...@@ -61,10 +61,10 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access ...@@ -61,10 +61,10 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access
AccessControl accessControl = new AccessControl(); AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode()); accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr); accessControl.setRecognition(remoteAddr);
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
if (extFields != null) { if (extFields != null) {
accessControl.setAccount(extFields.get("account")); accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password")); accessControl.setPassword(extFields.get("password"));
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
accessControl.setTopic(extFields.get("topic")); accessControl.setTopic(extFields.get("topic"));
} }
return accessControl; return accessControl;
...@@ -72,13 +72,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access ...@@ -72,13 +72,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access
@Override @Override
public void validate(AccessResource accessResource) { public void validate(AccessResource accessResource) {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); try {
if (authenticationResult.getException() != null) { AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); if (authenticationResult.getException() != null) {
} throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { }
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
} throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
}catch(Exception e) {
throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()) , e);
}
} }
} }
...@@ -14,20 +14,20 @@ ...@@ -14,20 +14,20 @@
# limitations under the License. # limitations under the License.
onlyNetAddress: onlyNetAddress:
netaddress: 127.0.0.* netaddress: 192.168.0.*
noPermitPullTopic: noPermitPullTopic:
- broker-a - broker-a
list: list:
- account: RocketMQ - account: RocketMQ
password: 1234567 password: 1234567
netaddress: 192.0.0.* netaddress: 192.168.0.*
permitSendTopic: permitSendTopic:
- test1 - TopicTest
- test2 - test2
- account: RocketMQ - account: RocketMQ
password: 1234567 password: 1234567
netaddress: 192.0.2.1 netaddress: 192.168.2.1
permitSendTopic: permitSendTopic:
- test3 - test3
- test4 - test4
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
* 1. 把broker模块src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator 复制到src/java/resources/META-INF/service
* 2. 查看distribution模块下 /conf/transport.yml文件,注意里面的账户密码,ip
* 3. 把ALC_RCP_HOOK_ACCOUT与ACL_RCP_HOOK_PASSWORD 修改成transport.yml里面对应的账户密码
* @author laohu
*
*/
public class AclClient {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static String ALC_RCP_HOOK_ACCOUT = "RocketMQ";
private static String ACL_RCP_HOOK_PASSWORD = "1234567";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
pushConsumer();
pullConsumer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",getAalRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5" , getAalRPCHook(),new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
public static void pullConsumer() throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6" , getAalRPCHook());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
printBody(pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void printBody(PullResult pullResult) {
printBody(pullResult.getMsgFoundList());
}
private static void printBody(List<MessageExt> msg) {
if(msg == null || msg.size() == 0)
return;
for(MessageExt m : msg) {
if(m != null) {
System.out.printf("msgId : %s body : %s",m.getMsgId() , new String(m.getBody()));
System.out.println();
}
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
static RPCHook getAalRPCHook() {
return new AalRPCHook(ALC_RCP_HOOK_ACCOUT, ACL_RCP_HOOK_PASSWORD);
}
static class AalRPCHook implements RPCHook{
private String account;
private String password;
public AalRPCHook(String account , String password) {
this.account = account;
this.password = password;
}
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
HashMap<String, String> ext = request.getExtFields();
if(ext == null) {
ext = new HashMap<>();
request.setExtFields(ext);
}
ext.put("account", this.account);
ext.put("password", this.password);
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
// TODO Auto-generated method stub
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册