From ceaa64bb5611e31474231634aa3fc8382fec12a2 Mon Sep 17 00:00:00 2001 From: hujie Date: Fri, 23 Nov 2018 18:26:54 +0800 Subject: [PATCH] tools acl --- .../rocketmq/acl/PlainAccessValidator.java | 16 +++- .../rocketmq/acl/plug/PlainAclPlugEngine.java | 58 ++++++++++++++ .../apache/rocketmq/srvutil/ServerUtil.java | 10 +++ .../tools/command/MQAdminStartup.java | 80 +++++++++++++++++-- 4 files changed, 154 insertions(+), 10 deletions(-) diff --git a/acl/src/main/java/org/apache/rocketmq/acl/PlainAccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/PlainAccessValidator.java index ef25a9c7..74e988a7 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/PlainAccessValidator.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/PlainAccessValidator.java @@ -35,6 +35,7 @@ public class PlainAccessValidator implements AccessValidator { @Override public AccessResource parse(RemotingCommand request, String remoteAddr) { HashMap extFields = request.getExtFields(); + int code = request.getCode(); AccessControl accessControl = new AccessControl(); accessControl.setCode(request.getCode()); accessControl.setRecognition(remoteAddr); @@ -42,23 +43,30 @@ public class PlainAccessValidator implements AccessValidator { if (extFields != null) { accessControl.setAccount(extFields.get("account")); accessControl.setPassword(extFields.get("password")); - accessControl.setTopic(extFields.get("topic")); + if (code == 310 || code == 320) { + accessControl.setTopic(extFields.get("b")); + } else { + accessControl.setTopic(extFields.get("topic")); + + } } return accessControl; } @Override public void validate(AccessResource accessResource) { - AuthenticationResult authenticationResult = null; + AuthenticationResult authenticationResult = null; try { - authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); + authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource); + if (authenticationResult.isSucceed()) + return; } catch (Exception e) { throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e); } if (authenticationResult.getException() != null) { throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException()); } - if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) { + if (authenticationResult.getAccessControl() != null || !authenticationResult.isSucceed()) { throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString())); } } diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java b/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java index c255b59e..580595ca 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plug/PlainAclPlugEngine.java @@ -16,7 +16,15 @@ */ package org.apache.rocketmq.acl.plug; +import java.io.IOException; import java.lang.reflect.Field; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -25,6 +33,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.logging.InternalLogger; @@ -49,6 +58,7 @@ public class PlainAclPlugEngine { public PlainAclPlugEngine() { initialize(); + watch(); } public void initialize() { @@ -61,6 +71,54 @@ public class PlainAclPlugEngine { setBorkerAccessControlTransport(accessControlTransport); } + private void watch() { + String version = System.getProperty("java.version"); + log.info("java.version is : {}", version); + String[] str = StringUtils.split(version, "."); + if (Integer.valueOf(str[1]) < 7) { + log.warn("wacth need jdk 1.7 support , current version no support"); + return; + } + try { + final WatchService watcher = FileSystems.getDefault().newWatchService(); + Path p = Paths.get(fileHome + "/conf/"); + p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE); + ServiceThread watcherServcie = new ServiceThread() { + + public void run() { + while (true) { + try { + while (true) { + WatchKey watchKey = watcher.take(); + List> watchEvents = watchKey.pollEvents(); + for (WatchEvent event : watchEvents) { + if ("transport.yml".equals(event.context().toString()) && + (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) { + log.info("transprot.yml make a difference change is : ", event.toString()); + initialize(); + } + } + watchKey.reset(); + } + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + + @Override + public String getServiceName() { + return "watcherServcie"; + } + + }; + watcherServcie.start(); + log.info("succeed start watcherServcie"); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + public void setAccessControl(AccessControl accessControl) throws AclPlugRuntimeException { if (accessControl.getAccount() == null || accessControl.getPassword() == null || accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) { diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java index 066d36ce..8d4f2302 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java @@ -37,6 +37,16 @@ public class ServerUtil { opt.setRequired(false); options.addOption(opt); + + opt = new Option("account", "account", true, "acl want the parameters"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("password", "password", true, "acl want the parameters"); + opt.setRequired(false); + options.addOption(opt); + + return options; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 6a51b7b4..d79e174a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -16,16 +16,21 @@ */ package org.apache.rocketmq.tools.command; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.joran.JoranConfigurator; -import ch.qos.logback.core.joran.spi.JoranException; - +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Properties; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.RPCHook; @@ -76,6 +81,10 @@ import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; import org.slf4j.LoggerFactory; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import ch.qos.logback.core.joran.spi.JoranException; + public class MQAdminStartup { protected static List subCommandList = new ArrayList(); @@ -129,7 +138,7 @@ public class MQAdminStartup { System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); } - cmd.execute(commandLine, options, rpcHook); + cmd.execute(commandLine, options, getAclRPCHook(commandLine)); } else { System.out.printf("The sub command %s not exist.%n", args[0]); } @@ -211,7 +220,7 @@ public class MQAdminStartup { private static void printHelp() { System.out.printf("The most commonly used mqadmin commands are:%n"); - + System.out.println("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command"); for (SubCommand cmd : subCommandList) { System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc()); } @@ -243,4 +252,63 @@ public class MQAdminStartup { public static void initCommand(SubCommand command) { subCommandList.add(command); } + + public static RPCHook getAclRPCHook(CommandLine commandLine) { + String account=null ,password = null; + if(commandLine.hasOption("account")) { + account = commandLine.getOptionValue("account"); + password = commandLine.getOptionValue("password"); + }else { + String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + File file = new File(fileHome+"/conf/tools.properties"); + if(!file.exists()) { + System.out.println("no find tools.properties , , Execution may fail without account andd password"); + System.out.println("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command"); + return null; + } + InputStream in=null; + try { + in = new BufferedInputStream(new FileInputStream(file)); + Properties properties = new Properties(); + properties.load(in); + account = properties.getProperty("account"); + password = properties.getProperty("password"); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + }finally { + if(in != null) { + try { + in.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + if(StringUtils.isNotBlank(account) && StringUtils.isNotBlank(password) ) { + final String newAccount = account; + final String newPassword = password; + return new RPCHook() { + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + HashMap ext = request.getExtFields(); + if (ext == null) { + ext = new HashMap<>(); + request.setExtFields(ext); + } + ext.put("account", newAccount); + ext.put("password", newPassword); + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {} + }; + } + System.out.println("account andd password data incorrectness , Execution may fail without account andd password"); + System.out.println("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command"); + return null; + } } -- GitLab