提交 11d3df66 编写于 作者: L laohu

add acl use example. AclClient.java

上级 94403714
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.acl.plug; package org.apache.rocketmq.acl.plug;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessResource; import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.AccessValidator;
...@@ -27,8 +28,6 @@ import org.apache.rocketmq.acl.plug.entity.ControllerParameters; ...@@ -27,8 +28,6 @@ import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException; import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.HashMap;
public class DefaultAclRemotingServiceImpl implements AclRemotingService, AccessValidator { public class DefaultAclRemotingServiceImpl implements AclRemotingService, AccessValidator {
private AclPlugEngine aclPlugEngine; private AclPlugEngine aclPlugEngine;
...@@ -72,17 +71,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access ...@@ -72,17 +71,17 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService, Access
@Override @Override
public void validate(AccessResource accessResource) { public void validate(AccessResource accessResource) {
try { try {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
if (authenticationResult.getException() != null) { if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
} }
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
} }
}catch(Exception e) { } catch (Exception e) {
throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()) , e); throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e);
} }
} }
} }
...@@ -20,7 +20,6 @@ import java.util.HashMap; ...@@ -20,7 +20,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
...@@ -40,170 +39,168 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; ...@@ -40,170 +39,168 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/** /**
* 1. 把broker模块src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator 复制到src/java/resources/META-INF/service *
* 2. 查看distribution模块下 /conf/transport.yml文件,注意里面的账户密码,ip * English explain
* 3. 把ALC_RCP_HOOK_ACCOUT与ACL_RCP_HOOK_PASSWORD 修改成transport.yml里面对应的账户密码 * 1. broker module src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator copy to src/java/resources/META-INF/service.
* @author laohu *
* 2. view the /conf/transport.yml file under the distribution module, pay attention to the account password, IP.
*
* 3. Modify ALC_RCP_HOOK_ACCOUT and ACL_RCP_HOOK_PASSWORD to the corresponding account password in transport.yml
* *
*/ */
public class AclClient { public class AclClient {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static String ALC_RCP_HOOK_ACCOUT = "RocketMQ"; private static final String ACL_RCPHOOK_ACCOUT = "RocketMQ";
private static String ACL_RCP_HOOK_PASSWORD = "1234567"; private static final String ACL_RCPHOOK_PASSWORD = "1234567";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
public static void main(String[] args) throws MQClientException, InterruptedException { pushConsumer();
producer(); pullConsumer();
pushConsumer(); }
pullConsumer();
} public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAalRPCHook());
public static void producer() throws MQClientException { producer.setNamesrvAddr("127.0.0.1:9876");
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",getAalRPCHook()); producer.start();
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); for (int i = 0; i < 128; i++)
try {
for (int i = 0; i < 128; i++) {
try { Message msg = new Message("TopicTest",
{ "TagA",
Message msg = new Message("TopicTest", "OrderID188",
"TagA", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
"OrderID188", SendResult sendResult = producer.send(msg);
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); System.out.printf("%s%n", sendResult);
SendResult sendResult = producer.send(msg); }
System.out.printf("%s%n", sendResult);
} } catch (Exception e) {
e.printStackTrace();
} catch (Exception e) { }
e.printStackTrace();
} producer.shutdown();
}
producer.shutdown();
} public static void pushConsumer() throws MQClientException {
public static void pushConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAalRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5" , getAalRPCHook(),new AllocateMessageQueueAveragely()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("127.0.0.1:9876"); //wrong time format 2017_0422_221800
consumer.subscribe("TopicTest", "*"); consumer.setConsumeTimestamp("20180422221800");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() {
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800"); @Override
consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
@Override printBody(msgs);
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }
printBody(msgs); });
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; consumer.start();
} System.out.printf("Consumer Started.%n");
}); }
consumer.start();
System.out.printf("Consumer Started.%n"); public static void pullConsumer() throws MQClientException {
} DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAalRPCHook());
consumer.setNamesrvAddr("127.0.0.1:9876");
public static void pullConsumer() throws MQClientException { consumer.start();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6" , getAalRPCHook());
consumer.setNamesrvAddr("127.0.0.1:9876"); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
consumer.start(); for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); SINGLE_MQ:
for (MessageQueue mq : mqs) { while (true) {
System.out.printf("Consume from the queue: %s%n", mq); try {
SINGLE_MQ: PullResult pullResult =
while (true) { consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
try { System.out.printf("%s%n", pullResult);
PullResult pullResult = putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); printBody(pullResult);
System.out.printf("%s%n", pullResult); switch (pullResult.getPullStatus()) {
putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); case FOUND:
printBody(pullResult); break;
switch (pullResult.getPullStatus()) { case NO_MATCHED_MSG:
case FOUND: break;
break; case NO_NEW_MSG:
case NO_MATCHED_MSG: break SINGLE_MQ;
break; case OFFSET_ILLEGAL:
case NO_NEW_MSG: break;
break SINGLE_MQ; default:
case OFFSET_ILLEGAL: break;
break; }
default: } catch (Exception e) {
break; e.printStackTrace();
} }
} catch (Exception e) { }
e.printStackTrace(); }
}
} consumer.shutdown();
} }
consumer.shutdown(); private static void printBody(PullResult pullResult) {
} printBody(pullResult.getMsgFoundList());
}
private static void printBody(PullResult pullResult) {
printBody(pullResult.getMsgFoundList()); private static void printBody(List<MessageExt> msg) {
} if (msg == null || msg.size() == 0)
return;
private static void printBody(List<MessageExt> msg) { for (MessageExt m : msg) {
if(msg == null || msg.size() == 0) if (m != null) {
return; System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
for(MessageExt m : msg) { }
if(m != null) { }
System.out.printf("msgId : %s body : %s",m.getMsgId() , new String(m.getBody())); }
System.out.println();
} private static long getMessageQueueOffset(MessageQueue mq) {
} Long offset = OFFSE_TABLE.get(mq);
} if (offset != null)
return offset;
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq); return 0;
if (offset != null) }
return offset;
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
return 0; OFFSE_TABLE.put(mq, offset);
} }
private static void putMessageQueueOffset(MessageQueue mq, long offset) { static RPCHook getAalRPCHook() {
OFFSE_TABLE.put(mq, offset); return new AalRPCHook(ACL_RCPHOOK_ACCOUT, ACL_RCPHOOK_PASSWORD);
} }
static RPCHook getAalRPCHook() { static class AalRPCHook implements RPCHook {
return new AalRPCHook(ALC_RCP_HOOK_ACCOUT, ACL_RCP_HOOK_PASSWORD);
} private String account;
private String password;
static class AalRPCHook implements RPCHook{
public AalRPCHook(String account, String password) {
private String account; this.account = account;
this.password = password;
private String password; }
public AalRPCHook(String account , String password) { @Override
this.account = account; public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
this.password = password;
} HashMap<String, String> ext = request.getExtFields();
if (ext == null) {
@Override ext = new HashMap<>();
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { request.setExtFields(ext);
}
HashMap<String, String> ext = request.getExtFields(); ext.put("account", this.account);
if(ext == null) { ext.put("password", this.password);
ext = new HashMap<>(); }
request.setExtFields(ext);
} @Override
ext.put("account", this.account); public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
ext.put("password", this.password); // TODO Auto-generated method stub
}
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { }
// TODO Auto-generated method stub
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册