提交 aeea0215 编写于 作者: D dongeforever

Draft the rpc hook and access validator plugin mechanism

上级 76ab7bdb
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
public interface AccessResource {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator {
/**
* Parse to get the AccessResource(user, resource, needed permission)
* @param request
* @return
*/
AccessResource parse(RemotingCommand request);
/**
* Validate the access resource.
* @param accessResource
*/
void validate(AccessResource accessResource) ;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultAccessValidator implements AccessValidator {
@Override public AccessResource parse(RemotingCommand request) {
return null;
}
@Override public void validate(AccessResource accessResource) {
}
}
...@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker; ...@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -32,11 +31,8 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -32,11 +31,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.AclPlugController; import org.apache.rocketmq.acl.plug.AclPlugController;
import org.apache.rocketmq.acl.plug.AclRemotingService;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.broker.client.ClientHousekeepingService; import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.ConsumerManager;
...@@ -476,7 +472,8 @@ public class BrokerController { ...@@ -476,7 +472,8 @@ public class BrokerController {
} }
} }
initialTransaction(); initialTransaction();
initialAclPlug(); initialAcl();
initialRpcHooks();
} }
return result; return result;
} }
...@@ -496,44 +493,42 @@ public class BrokerController { ...@@ -496,44 +493,42 @@ public class BrokerController {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
} }
private void initialAclPlug() { private void initialAcl() {
try { if (!this.brokerConfig.isEnableAcl()) {
if (!this.brokerConfig.isAclPlug()) { log.info("The broker dose not enable acl");
log.info("Default does not start acl plug"); return;
return; }
}
ControllerParameters controllerParameters = new ControllerParameters(); List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
controllerParameters.setFileHome(brokerConfig.getRocketmqHome()); if (accessValidators == null || accessValidators.isEmpty()) {
aclPlugController = new AclPlugController(controllerParameters); return;
if (!aclPlugController.isStartSucceed()) { }
log.error("start acl plug failure");
return; for (AccessValidator accessValidator: accessValidators) {
} final AccessValidator validator = accessValidator;
final AclRemotingService aclRemotingService = aclPlugController.getAclRemotingService();
this.registerServerRPCHook(new RPCHook() { this.registerServerRPCHook(new RPCHook() {
@Override @Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
HashMap<String, String> extFields = request.getExtFields(); validator.validate(validator.parse(request));
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
accessControl.setTopic(extFields.get("topic"));
}
aclRemotingService.check(accessControl);
} }
@Override @Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
} }
}); });
}
}
} catch (Exception e) {
log.error(e.getMessage(), e); private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook: rpcHooks) {
this.registerServerRPCHook(rpcHook);
} }
} }
......
...@@ -34,6 +34,14 @@ public class ServiceProvider { ...@@ -34,6 +34,14 @@ public class ServiceProvider {
public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"; public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
public static final String RPC_HOOK_ID = "META-INF/service/org.apache.rocketmq.remoting.RPCHook";
public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";
static { static {
thisClassLoader = getClassLoader(ServiceProvider.class); thisClassLoader = getClassLoader(ServiceProvider.class);
} }
......
...@@ -171,7 +171,8 @@ public class BrokerConfig { ...@@ -171,7 +171,8 @@ public class BrokerConfig {
@ImportantField @ImportantField
private long transactionCheckInterval = 60 * 1000; private long transactionCheckInterval = 60 * 1000;
private boolean isAclPlug; private boolean enableAcl;
public static String localHostName() { public static String localHostName() {
try { try {
...@@ -711,12 +712,12 @@ public class BrokerConfig { ...@@ -711,12 +712,12 @@ public class BrokerConfig {
this.transactionCheckInterval = transactionCheckInterval; this.transactionCheckInterval = transactionCheckInterval;
} }
public boolean isAclPlug() { public boolean isEnableAcl() {
return isAclPlug; return enableAcl;
} }
public void setAclPlug(boolean isAclPlug) { public void setEnableAcl(boolean isAclPlug) {
this.isAclPlug = isAclPlug; this.enableAcl = isAclPlug;
} }
public int getEndTransactionThreadPoolNums() { public int getEndTransactionThreadPoolNums() {
...@@ -742,5 +743,4 @@ public class BrokerConfig { ...@@ -742,5 +743,4 @@ public class BrokerConfig {
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) { public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue; this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
} }
} }
...@@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
...@@ -46,8 +48,6 @@ import org.apache.rocketmq.remoting.common.ServiceThread; ...@@ -46,8 +48,6 @@ import org.apache.rocketmq.remoting.common.ServiceThread;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
......
...@@ -34,7 +34,6 @@ import io.netty.handler.timeout.IdleState; ...@@ -34,7 +34,6 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
......
...@@ -40,8 +40,6 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; ...@@ -40,8 +40,6 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
...@@ -49,6 +47,8 @@ import java.util.concurrent.ExecutorService; ...@@ -49,6 +47,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
...@@ -60,8 +60,6 @@ import org.apache.rocketmq.remoting.common.TlsMode; ...@@ -60,8 +60,6 @@ import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册