提交 ceaa64bb 编写于 作者: H hujie

tools acl

上级 65bd9bf8
......@@ -35,6 +35,7 @@ public class PlainAccessValidator implements AccessValidator {
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
HashMap<String, String> extFields = request.getExtFields();
int code = request.getCode();
AccessControl accessControl = new AccessControl();
accessControl.setCode(request.getCode());
accessControl.setRecognition(remoteAddr);
......@@ -42,23 +43,30 @@ public class PlainAccessValidator implements AccessValidator {
if (extFields != null) {
accessControl.setAccount(extFields.get("account"));
accessControl.setPassword(extFields.get("password"));
accessControl.setTopic(extFields.get("topic"));
if (code == 310 || code == 320) {
accessControl.setTopic(extFields.get("b"));
} else {
accessControl.setTopic(extFields.get("topic"));
}
}
return accessControl;
}
@Override
public void validate(AccessResource accessResource) {
AuthenticationResult authenticationResult = null;
AuthenticationResult authenticationResult = null;
try {
authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
authenticationResult = aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
if (authenticationResult.isSucceed())
return;
} catch (Exception e) {
throw new AclPlugRuntimeException(String.format("validate exception AccessResource data %s", accessResource.toString()), e);
}
if (authenticationResult.getException() != null) {
throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessResource.toString()), authenticationResult.getException());
}
if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
if (authenticationResult.getAccessControl() != null || !authenticationResult.isSucceed()) {
throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessResource.toString()));
}
}
......
......@@ -16,7 +16,15 @@
*/
package org.apache.rocketmq.acl.plug;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
......@@ -25,6 +33,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -49,6 +58,7 @@ public class PlainAclPlugEngine {
public PlainAclPlugEngine() {
initialize();
watch();
}
public void initialize() {
......@@ -61,6 +71,54 @@ public class PlainAclPlugEngine {
setBorkerAccessControlTransport(accessControlTransport);
}
private void watch() {
String version = System.getProperty("java.version");
log.info("java.version is : {}", version);
String[] str = StringUtils.split(version, ".");
if (Integer.valueOf(str[1]) < 7) {
log.warn("wacth need jdk 1.7 support , current version no support");
return;
}
try {
final WatchService watcher = FileSystems.getDefault().newWatchService();
Path p = Paths.get(fileHome + "/conf/");
p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
ServiceThread watcherServcie = new ServiceThread() {
public void run() {
while (true) {
try {
while (true) {
WatchKey watchKey = watcher.take();
List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
for (WatchEvent<?> event : watchEvents) {
if ("transport.yml".equals(event.context().toString()) &&
(StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
log.info("transprot.yml make a difference change is : ", event.toString());
initialize();
}
}
watchKey.reset();
}
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
@Override
public String getServiceName() {
return "watcherServcie";
}
};
watcherServcie.start();
log.info("succeed start watcherServcie");
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
public void setAccessControl(AccessControl accessControl) throws AclPlugRuntimeException {
if (accessControl.getAccount() == null || accessControl.getPassword() == null
|| accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) {
......
......@@ -37,6 +37,16 @@ public class ServerUtil {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("account", "account", true, "acl want the parameters");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("password", "password", true, "acl want the parameters");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......
......@@ -16,16 +16,21 @@
*/
package org.apache.rocketmq.tools.command;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -76,6 +81,10 @@ import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
......@@ -129,7 +138,7 @@ public class MQAdminStartup {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, rpcHook);
cmd.execute(commandLine, options, getAclRPCHook(commandLine));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
......@@ -211,7 +220,7 @@ public class MQAdminStartup {
private static void printHelp() {
System.out.printf("The most commonly used mqadmin commands are:%n");
System.out.println("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command");
for (SubCommand cmd : subCommandList) {
System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc());
}
......@@ -243,4 +252,63 @@ public class MQAdminStartup {
public static void initCommand(SubCommand command) {
subCommandList.add(command);
}
public static RPCHook getAclRPCHook(CommandLine commandLine) {
String account=null ,password = null;
if(commandLine.hasOption("account")) {
account = commandLine.getOptionValue("account");
password = commandLine.getOptionValue("password");
}else {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,System.getenv(MixAll.ROCKETMQ_HOME_ENV));
File file = new File(fileHome+"/conf/tools.properties");
if(!file.exists()) {
System.out.println("no find tools.properties , , Execution may fail without account andd password");
System.out.println("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command");
return null;
}
InputStream in=null;
try {
in = new BufferedInputStream(new FileInputStream(file));
Properties properties = new Properties();
properties.load(in);
account = properties.getProperty("account");
password = properties.getProperty("password");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally {
if(in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
if(StringUtils.isNotBlank(account) && StringUtils.isNotBlank(password) ) {
final String newAccount = account;
final String newPassword = password;
return new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
HashMap<String, String> ext = request.getExtFields();
if (ext == null) {
ext = new HashMap<>();
request.setExtFields(ext);
}
ext.put("account", newAccount);
ext.put("password", newPassword);
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}
};
}
System.out.println("account andd password data incorrectness , Execution may fail without account andd password");
System.out.println("ROCKETMQ_HOME Add tools.properties to the %ROCKETMQ_HOME%/conf/ directory or add -account xxxx -password xxxx Join when executing a command");
return null;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册