提交 8a0219df 编写于 作者: H huzongtang

[ISSUE#724]Integrated ACL Interceptor into SnodeController.

上级 c7fdb765
......@@ -69,10 +69,6 @@
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
</dependencies>
</project>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
......@@ -27,7 +27,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class SqlConsumer {
public static void main(String[] args) throws Exception{
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Don't forget to set enablePropertyFilter=true in broker
......
......@@ -525,6 +525,11 @@
<artifactId>rocketmq-example</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
......@@ -162,6 +162,13 @@ public class RemotingUtil {
return sb.toString();
}
public static String socketAddress2IpString(final SocketAddress addr) {
StringBuilder sb = new StringBuilder();
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
sb.append(inetSocketAddress.getAddress().getHostAddress());
return sb.toString();
}
public static SocketChannel connect(SocketAddress remote) {
return connect(remote, 1000 * 5);
}
......
......@@ -40,6 +40,8 @@ public class ServiceProvider {
public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";
static {
thisClassLoader = getClassLoader(ServiceProvider.class);
}
......
......@@ -52,6 +52,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
......@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -34,9 +35,15 @@ import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.util.ServiceProvider;
import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ClientManager;
import org.apache.rocketmq.snode.client.SlowConsumerService;
......@@ -201,10 +208,48 @@ public class SnodeController {
this.registerProcessor();
initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
initAclInterceptorGroup();
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
return true;
}
private void initAclInterceptorGroup() {
if (!this.snodeConfig.isAclEnable()) {
log.info("The snode dose not enable acl");
return;
}
List<AccessValidator> accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The snode dose not load the AccessValidator");
return;
}
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
this.remotingServerInterceptorGroup.registerInterceptor(new Interceptor() {
@Override
public String interceptorName() {
return "snodeRequestAclControlInterceptor";
}
@Override public void beforeRequest(RequestContext requestContext) {
//Do not catch the exception
RemotingCommand request = requestContext.getRequest();
String remoteAddr = RemotingUtil.socketAddress2IpString(requestContext.getRemotingChannel().remoteAddress());
validator.validate(validator.parse(request, remoteAddr));
}
@Override public void afterRequest(ResponseContext responseContext) { }
@Override public void onException(ExceptionContext exceptionContext) { }
});
}
}
private void initSnodeInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
......
......@@ -79,6 +79,12 @@ public class SnodeConfig {
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
/**
* Acl feature switch
*/
@ImportantField
private boolean aclEnable = false;
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
}
......@@ -291,4 +297,12 @@ public class SnodeConfig {
public void setSlowConsumerThreshold(int slowConsumerThreshold) {
this.slowConsumerThreshold = slowConsumerThreshold;
}
public boolean isAclEnable() {
return aclEnable;
}
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
}
......@@ -17,17 +17,17 @@
package org.apache.rocketmq.snode.flowcontrol;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingRuntimeException;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final AtomicLong logCount = new AtomicLong(0);
......
org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
......@@ -36,4 +36,18 @@ public class SnodeControllerTest {
snodeController.shutdown();
}
@Test
public void testSnodeRestartWithAclEnable() {
SnodeConfig snodeConfig = new SnodeConfig();
snodeConfig.setAclEnable(true);
SnodeController snodeController = new SnodeController(
new ServerConfig(),
new ClientConfig(),
snodeConfig);
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
}
}
snode:
countLimit: # flow control type, only requestCount & requestSize support
- flowControlResourceName: 310
flowControlGrade: 1
flowControlBehavior: 1
flowControlResourceCount: 500.00 #QPS
- flowControlResourceName: overall
flowControlGrade: 1
flowControlBehavior: 1
flowControlResourceCount: 10000.00 #QPS
sizeLimit:
- flowControlResourceName: 310
flowControlGrade: 1
flowControlBehavior: 1
flowControlResourceCount: 5.00 #KB/S
topicLimit:
\ No newline at end of file
org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
org.apache.rocketmq.snode.flowcontrol.RequestSizeFlowControlServiceImpl
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册