From aeea021581ac4c49f903561219f2dd631be86d5f Mon Sep 17 00:00:00 2001 From: dongeforever Date: Sat, 27 Oct 2018 14:52:51 +0800 Subject: [PATCH] Draft the rpc hook and access validator plugin mechanism --- .../apache/rocketmq/acl/AccessResource.java | 21 +++++++ .../apache/rocketmq/acl/AccessValidator.java | 35 +++++++++++ .../rocketmq/acl/DefaultAccessValidator.java | 31 ++++++++++ .../rocketmq/broker/BrokerController.java | 61 +++++++++---------- .../rocketmq/broker/util/ServiceProvider.java | 8 +++ .../apache/rocketmq/common/BrokerConfig.java | 12 ++-- .../remoting/netty/NettyRemotingAbstract.java | 4 +- .../remoting/netty/NettyRemotingClient.java | 1 - .../remoting/netty/NettyRemotingServer.java | 6 +- 9 files changed, 133 insertions(+), 46 deletions(-) create mode 100644 acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java create mode 100644 acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java create mode 100644 acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java new file mode 100644 index 00000000..e30febc5 --- /dev/null +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.acl; + +public interface AccessResource { +} diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java new file mode 100644 index 00000000..d573e56c --- /dev/null +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.acl; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public interface AccessValidator { + /** + * Parse to get the AccessResource(user, resource, needed permission) + * @param request + * @return + */ + AccessResource parse(RemotingCommand request); + + /** + * Validate the access resource. + * @param accessResource + */ + void validate(AccessResource accessResource) ; +} diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java new file mode 100644 index 00000000..859cc809 --- /dev/null +++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.acl; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class DefaultAccessValidator implements AccessValidator { + + @Override public AccessResource parse(RemotingCommand request) { + return null; + } + + @Override public void validate(AccessResource accessResource) { + + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index c30d1f33..7a4c105e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.broker; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -32,11 +31,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plug.AclPlugController; -import org.apache.rocketmq.acl.plug.AclRemotingService; -import org.apache.rocketmq.acl.plug.entity.AccessControl; -import org.apache.rocketmq.acl.plug.entity.ControllerParameters; import org.apache.rocketmq.broker.client.ClientHousekeepingService; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerManager; @@ -476,7 +472,8 @@ public class BrokerController { } } initialTransaction(); - initialAclPlug(); + initialAcl(); + initialRpcHooks(); } return result; } @@ -496,44 +493,42 @@ public class BrokerController { this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); } - private void initialAclPlug() { - try { - if (!this.brokerConfig.isAclPlug()) { - log.info("Default does not start acl plug"); - return; - } - ControllerParameters controllerParameters = new ControllerParameters(); - controllerParameters.setFileHome(brokerConfig.getRocketmqHome()); - aclPlugController = new AclPlugController(controllerParameters); - if (!aclPlugController.isStartSucceed()) { - log.error("start acl plug failure"); - return; - } - final AclRemotingService aclRemotingService = aclPlugController.getAclRemotingService(); + private void initialAcl() { + if (!this.brokerConfig.isEnableAcl()) { + log.info("The broker dose not enable acl"); + return; + } + + List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + if (accessValidators == null || accessValidators.isEmpty()) { + return; + } + + for (AccessValidator accessValidator: accessValidators) { + final AccessValidator validator = accessValidator; this.registerServerRPCHook(new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - HashMap extFields = request.getExtFields(); - AccessControl accessControl = new AccessControl(); - accessControl.setCode(request.getCode()); - accessControl.setRecognition(remoteAddr); - if (extFields != null) { - accessControl.setAccount(extFields.get("account")); - accessControl.setPassword(extFields.get("password")); - accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]); - accessControl.setTopic(extFields.get("topic")); - } - aclRemotingService.check(accessControl); + validator.validate(validator.parse(request)); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); + } + } - } catch (Exception e) { - log.error(e.getMessage(), e); + + private void initialRpcHooks() { + + List rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class); + if (rpcHooks == null || rpcHooks.isEmpty()) { + return; + } + for (RPCHook rpcHook: rpcHooks) { + this.registerServerRPCHook(rpcHook); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java index 8b9b63e4..e6796601 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java @@ -34,6 +34,14 @@ public class ServiceProvider { public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"; + + public static final String RPC_HOOK_ID = "META-INF/service/org.apache.rocketmq.remoting.RPCHook"; + + + public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator"; + + + static { thisClassLoader = getClassLoader(ServiceProvider.class); } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 6e11de20..60bd7ce4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -171,7 +171,8 @@ public class BrokerConfig { @ImportantField private long transactionCheckInterval = 60 * 1000; - private boolean isAclPlug; + private boolean enableAcl; + public static String localHostName() { try { @@ -711,12 +712,12 @@ public class BrokerConfig { this.transactionCheckInterval = transactionCheckInterval; } - public boolean isAclPlug() { - return isAclPlug; + public boolean isEnableAcl() { + return enableAcl; } - public void setAclPlug(boolean isAclPlug) { - this.isAclPlug = isAclPlug; + public void setEnableAcl(boolean isAclPlug) { + this.enableAcl = isAclPlug; } public int getEndTransactionThreadPoolNums() { @@ -742,5 +743,4 @@ public class BrokerConfig { public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) { this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue; } - } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 206b96ad..28ae001b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -46,8 +48,6 @@ import org.apache.rocketmq.remoting.common.ServiceThread; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index e891ad72..90f51ff0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -34,7 +34,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; import java.net.SocketAddress; import java.security.cert.CertificateException; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 90386f37..ec34a4be 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -40,8 +40,6 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.io.IOException; import java.net.InetSocketAddress; import java.security.cert.CertificateException; -import java.util.ArrayList; -import java.util.List; import java.util.NoSuchElementException; import java.util.Timer; import java.util.TimerTask; @@ -49,6 +47,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -60,8 +60,6 @@ import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { -- GitLab