未验证 提交 2d4cda26 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge pull request #519 from githublaohu/feature_acl

[ISSUE#403]Implement the acl strandard for rocketmq
......@@ -18,8 +18,8 @@
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl-plug</artifactId>
<name>rocketmq-acl-plug ${project.version}</name>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
......
......@@ -22,14 +22,16 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator {
/**
* Parse to get the AccessResource(user, resource, needed permission)
*
* @param request
* @return
*/
AccessResource parse(RemotingCommand request);
AccessResource parse(RemotingCommand request, String remoteAddr);
/**
* Validate the access resource.
*
* @param accessResource
*/
void validate(AccessResource accessResource) ;
void validate(AccessResource accessResource);
}
......@@ -21,11 +21,13 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultAccessValidator implements AccessValidator {
@Override public AccessResource parse(RemotingCommand request) {
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
return null;
}
@Override public void validate(AccessResource accessResource) {
@Override
public void validate(AccessResource accessResource) {
}
}
......@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.acl.plug;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AccessContralAnalysis {
......
......@@ -16,15 +16,28 @@
*/
package org.apache.rocketmq.acl.plug;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultAclRemotingServiceImpl implements AclRemotingService {
public class DefaultAclRemotingServiceImpl implements AclRemotingService, AccessValidator {
private AclPlugEngine aclPlugEngine;
public DefaultAclRemotingServiceImpl() {
ControllerParameters controllerParameters = new ControllerParameters();
this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
this.aclPlugEngine.initialize();
}
public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) {
this.aclPlugEngine = aclPlugEngine;
}
......@@ -41,4 +54,34 @@ public class DefaultAclRemotingServiceImpl implements AclRemotingService {
return authenticationResult;
}
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
HashMap<String, String> extFields = request.getExtFields();
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setTopic(extFields.get("topic"));
}
return accessControl;
}
@Override
public void validate(AccessResource accessResource) {
try {
AuthenticationResult authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e);
}
}
}
......@@ -31,5 +31,7 @@ public interface AclPlugEngine {
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl);
public AuthenticationResult eachCheckAuthentication(AccessControl accessControl);
public void initialize();
}
......@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.acl.plug.engine;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.Authentication;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
......@@ -37,7 +39,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
ControllerParameters controllerParameters;
private Map<String/** account **/, Map<String/** netaddress **/, AuthenticationInfo>> accessControlMap = new HashMap<>();
private Map<String/** account **/, List<AuthenticationInfo>> accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
......@@ -54,16 +56,16 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
}
try {
NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressMap == null) {
accessControlAddressMap = new HashMap<>();
accessControlMap.put(accessControl.getAccount(), accessControlAddressMap);
List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressList == null) {
accessControlAddressList = new ArrayList<>();
accessControlMap.put(accessControl.getAccount(), accessControlAddressList);
}
AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo);
accessControlAddressList.add(authenticationInfo);
log.info("authenticationInfo is {}", authenticationInfo.toString());
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("Exception info %s %s" ,e.getMessage() , accessControl.toString()), e);
throw new AclPlugRuntimeException(String.format("Exception info %s %s", e.getMessage(), accessControl.toString()), e);
}
}
......@@ -84,24 +86,19 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
}
public AuthenticationInfo getAccessControl(AccessControl accessControl) {
AuthenticationInfo existing = null;
if (accessControl.getAccount() == null && authenticationInfo != null) {
existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
return authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
} else {
Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressMap != null) {
existing = accessControlAddressMap.get(accessControl.getNetaddress());
if (existing == null)
return null;
if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) {
if (existing.getNetaddressStrategy().match(accessControl)) {
return existing;
List<AuthenticationInfo> accessControlAddressList = accessControlMap.get(accessControl.getAccount());
if (accessControlAddressList != null) {
for (AuthenticationInfo ai : accessControlAddressList) {
if (ai.getNetaddressStrategy().match(accessControl) && ai.getAccessControl().getPassword().equals(accessControl.getPassword())) {
return ai;
}
}
existing = null;
}
}
return existing;
return null;
}
@Override
......@@ -112,6 +109,7 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
}
} catch (Exception e) {
authenticationResult.setException(e);
......@@ -119,6 +117,21 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
return authenticationResult;
}
public AuthenticationResult eachCheckAuthentication(AccessControl accessControl) {
AuthenticationResult authenticationResult = new AuthenticationResult();
AuthenticationInfo authenticationInfo = getAccessControl(accessControl);
if (authenticationInfo != null) {
boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
authenticationResult.setSucceed(boo);
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
} else {
authenticationResult.setResultString("accessControl is null, Please check login, password, IP\"");
}
return authenticationResult;
}
void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't be all empty");
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.acl.plug.engine;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
......
......@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.acl.plug.engine;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
public PlainAclPlugEngine(
......
......@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.acl.plug.entity;
public class AccessControl {
import org.apache.rocketmq.acl.AccessResource;
public class AccessControl implements AccessResource {
private String account;
......
......@@ -16,10 +16,11 @@
*/
package org.apache.rocketmq.acl.plug.entity;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
public class AuthenticationInfo {
......
......@@ -16,11 +16,12 @@
*/
package org.apache.rocketmq.acl.plug.entity;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode;
public class ControllerParameters {
private String fileHome;
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Class<?> accessContralAnalysisClass = RequestCode.class;
......
......@@ -16,13 +16,14 @@
*/
package org.apache.rocketmq.acl.plug.strategy;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import java.util.HashSet;
import java.util.Set;
public class NetaddressStrategyFactory {
public static final NullNetaddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullNetaddressStrategy();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.plug;
import java.util.HashMap;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;;
public class AclRemotingServiceTest {
AclRemotingService defaultAclService;
AccessValidator accessValidator;
AccessControl accessControl;
AccessControl accessControlTwo;
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
DefaultAclRemotingServiceImpl aclRemotingServiceImpl = new DefaultAclRemotingServiceImpl();
defaultAclService = aclRemotingServiceImpl;
accessValidator = aclRemotingServiceImpl;
accessControl = new BorkerAccessControl();
accessControl.setAccount("RocketMQ");
accessControl.setPassword("1234567");
accessControl.setNetaddress("192.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("RocketMQ");
accessControlTwo.setPassword("1234567");
accessControlTwo.setNetaddress("192.0.2.1");
accessControlTwo.setRecognition("127.0.0.1:2");
}
@Test
public void defaultConstructorTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
AclRemotingService defaultAclService = new DefaultAclRemotingServiceImpl();
Assert.assertNotNull(defaultAclService);
}
@Test
public void parseTest() {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(34, "");
HashMap<String, String> map = new HashMap<>();
map.put("account", "RocketMQ");
map.put("password", "123456");
map.put("topic", "test");
remotingCommand.setExtFields(map);
AccessResource accessResource = accessValidator.parse(remotingCommand, "127.0.0.1:123");
AccessControl accessControl = (AccessControl) accessResource;
AccessControl newAccessControl = new AccessControl();
newAccessControl.setAccount("RocketMQ");
newAccessControl.setPassword("123456");
newAccessControl.setTopic("test");
newAccessControl.setCode(34);
newAccessControl.setNetaddress("127.0.0.1");
newAccessControl.setRecognition("127.0.0.1:123");
Assert.assertEquals(accessControl.toString(), newAccessControl.toString());
}
@Test
public void checkTest() {
accessControl.setCode(34);
AuthenticationResult authenticationResult = defaultAclService.check(accessControl);
Assert.assertTrue(authenticationResult.isSucceed());
}
@Test(expected = AclPlugRuntimeException.class)
public void checkAccessExceptionTest() {
accessControl.setCode(34);
accessControl.setAccount("Rocketmq");
defaultAclService.check(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void checkPasswordTest() {
accessControl.setCode(34);
accessControl.setPassword("123123123");
defaultAclService.check(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void checkCodeTest() {
accessControl.setCode(14434);
accessControl.setPassword("123123123");
defaultAclService.check(accessControl);
}
@Test
public void validateTest() {
accessControl.setCode(34);
accessValidator.validate(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void validateAccessExceptionTest() {
accessControl.setCode(34);
accessControl.setAccount("Rocketmq");
accessValidator.validate(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void validatePasswordTest() {
accessControl.setCode(34);
accessControl.setPassword("123123123");
accessValidator.validate(accessControl);
}
@Test(expected = AclPlugRuntimeException.class)
public void validateCodeTest() {
accessControl.setCode(14434);
accessControl.setPassword("123123123");
accessValidator.validate(accessControl);
}
}
......@@ -18,13 +18,12 @@ package org.apache.rocketmq.acl.plug;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class AclUtilsTest {
@Test
......
......@@ -57,46 +57,14 @@ public class PlainAclPlugEngineTest {
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
System.setProperty("rocketmq.home.dir", "src/test/resources");
ControllerParameters controllerParametersEntity = new ControllerParameters();
Yaml ymal = new Yaml();
String home = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
InputStream fis = null;
if (home == null) {
URL url = PlainAclPlugEngineTest.class.getResource("/");
home = url.toString();
home = home.substring(0, home.length() - 1).replace("file:/", "").replace("target/test-classes", "");
home = home + "src/test/resources";
if (!new File(home + "/conf/transport.yml").exists()) {
home = "/home/travis/build/githublaohu/rocketmq/acl-plug/src/test/resources";
}
}
String filePath = home + "/conf/transport.yml";
try {
fis = new FileInputStream(new File(filePath));
transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
}catch(Exception e) {
AccessControl accessControl = new BorkerAccessControl();
accessControl.setAccount("onlyNetAddress");
accessControl.setPassword("aliyun11");
accessControl.setNetaddress("127.0.0.1");
accessControl.setRecognition("127.0.0.1:1");
transport = ymal.loadAs(new FileInputStream(new File(controllerParametersEntity.getFileHome()+"/conf/transport.yml")), BorkerAccessControlTransport.class);
AccessControl accessControlTwo = new BorkerAccessControl();
accessControlTwo.setAccount("listTransport");
accessControlTwo.setPassword("aliyun1");
accessControlTwo.setNetaddress("127.0.0.1");
accessControlTwo.setRecognition("127.0.0.1:2");
transport = new BorkerAccessControlTransport();
transport.setOnlyNetAddress((BorkerAccessControl)accessControl);
}
ControllerParameters controllerParametersEntity = new ControllerParameters();
controllerParametersEntity.setFileHome(null);
try {
plainAclPlugEngine = new PlainAclPlugEngine(controllerParametersEntity);
plainAclPlugEngine.initialize();
} catch (Exception e) {
}
accessControl = new BorkerAccessControl();
accessControl.setAccount("rokcetmq");
......@@ -142,6 +110,7 @@ public class PlainAclPlugEngineTest {
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
ControllerParameters controllerParametersEntity = new ControllerParameters();
controllerParametersEntity.setFileHome("");
new PlainAclPlugEngine(controllerParametersEntity).initialize();
}
......
......@@ -50,7 +50,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl-plug</artifactId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
......
......@@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.AclPlugController;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -160,7 +159,6 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private AclPlugController aclPlugController;
public BrokerController(
final BrokerConfig brokerConfig,
......@@ -510,7 +508,7 @@ public class BrokerController {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
validator.validate(validator.parse(request));
validator.validate(validator.parse(request, remoteAddr));
}
@Override
......@@ -1095,9 +1093,6 @@ public class BrokerController {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
public AclPlugController getAclPlugController() {
return this.aclPlugController;
}
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
......
......@@ -72,9 +72,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) {
this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr);
}
}
@Override
......
......@@ -17,12 +17,15 @@
package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
public class ServiceProviderTest {
@Test
......@@ -38,4 +41,10 @@ public class ServiceProviderTest {
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
@Test
public void loadAccessValidatorTest() {
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
assertThat(accessValidators).isNotNull();
}
}
org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl
\ No newline at end of file
......@@ -20,5 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
aclPlug=true
enableAcl=true
namesrvAddr=127.0.0.1:9876
......@@ -14,20 +14,20 @@
# limitations under the License.
onlyNetAddress:
netaddress: 10.10.103.*
netaddress: 192.168.0.*
noPermitPullTopic:
- broker-a
list:
- account: RocketMQ
password: 1234567
netaddress: 192.0.0.*
netaddress: 192.168.0.*
permitSendTopic:
- test1
- TopicTest
- test2
- account: RocketMQ
password: 1234567
netaddress: 192.0.2.1
netaddress: 192.168.2.1
permitSendTopic:
- test3
- test4
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
*
* English explain
* 1. broker module src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator copy to src/java/resources/META-INF/service.
*
* 2. view the /conf/transport.yml file under the distribution module, pay attention to the account password, IP.
*
* 3. Modify ALC_RCP_HOOK_ACCOUT and ACL_RCP_HOOK_PASSWORD to the corresponding account password in transport.yml
*
*/
public class AclClient {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static final String ACL_RCPHOOK_ACCOUT = "RocketMQ";
private static final String ACL_RCPHOOK_PASSWORD = "1234567";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
pushConsumer();
pullConsumer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAalRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAalRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
public static void pullConsumer() throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAalRPCHook());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
printBody(pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void printBody(PullResult pullResult) {
printBody(pullResult.getMsgFoundList());
}
private static void printBody(List<MessageExt> msg) {
if (msg == null || msg.size() == 0)
return;
for (MessageExt m : msg) {
if (m != null) {
System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
}
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
static RPCHook getAalRPCHook() {
return new AalRPCHook(ACL_RCPHOOK_ACCOUT, ACL_RCPHOOK_PASSWORD);
}
static class AalRPCHook implements RPCHook {
private String account;
private String password;
public AalRPCHook(String account, String password) {
this.account = account;
this.password = password;
}
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
HashMap<String, String> ext = request.getExtFields();
if (ext == null) {
ext = new HashMap<>();
request.setExtFields(ext);
}
ext.put("account", this.account);
ext.put("password", this.password);
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
// TODO Auto-generated method stub
}
}
}
......@@ -525,7 +525,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl-plug</artifactId>
<artifactId>rocketmq-acl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......
......@@ -282,7 +282,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
if (!rpcHooks.contains(rpcHook)) {
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
......
......@@ -265,7 +265,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
if (!rpcHooks.contains(rpcHook)) {
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
......
......@@ -19,9 +19,13 @@ package org.apache.rocketmq.test.base;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
......@@ -48,6 +52,7 @@ public class BaseConf {
private static Logger log = Logger.getLogger(BaseConf.class);
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册