提交 180a7dbd 编写于 作者: W wangshaojie4039 提交者: Zhendong Liu

[ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature. (#651)

* [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature.

* [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature.

* [ISSUE#403]fix lock/unlock issue,and optimize the codes.
上级 8afd3bf9
......@@ -37,6 +37,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
......
......@@ -19,28 +19,20 @@ package org.apache.rocketmq.acl.plain;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.srvutil.FileWatchService;
public class PlainPermissionLoader {
......@@ -48,25 +40,31 @@ public class PlainPermissionLoader {
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
private boolean isWatchStart;
public PlainPermissionLoader() {
initialize();
load();
watch();
}
public void initialize() {
public void load() {
Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
......@@ -77,92 +75,42 @@ public class PlainPermissionLoader {
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
addGlobalWhiteRemoteAddress(globalWhiteRemoteAddressesList.getString(i));
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
JSONArray accounts = plainAclConfData.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccess : plainAccessList) {
this.addPlainAccessResource(getPlainAccessResource(plainAccess));
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
}
}
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;
}
private void watch() {
String version = System.getProperty("java.version");
String[] str = StringUtils.split(version, ".");
if (Integer.valueOf(str[1]) < 7) {
log.warn("Watch need jdk equal or greater than 1.7, current version is {}", str[1]);
return;
}
try {
int fileIndex = fileName.lastIndexOf("/") + 1;
String watchDirectory = fileName.substring(0, fileIndex);
final String watchFileName = fileName.substring(fileIndex);
log.info("watch directory is {} , watch file name is {} ", fileHome + File.separator + watchDirectory, watchFileName);
final WatchService watcher = FileSystems.getDefault().newWatchService();
Path p = Paths.get(fileHome + File.separator + watchDirectory);
p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
ServiceThread watcherServcie = new ServiceThread() {
public void run() {
while (true) {
try {
WatchKey watchKey = watcher.take();
List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
for (WatchEvent<?> event : watchEvents) {
if (watchFileName.equals(event.context().toString())
&& (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())
|| StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
log.info("{} make a difference change is : {}", watchFileName, event.toString());
//Clearing the info, may result in a non-available time
PlainPermissionLoader.this.clearPermissionInfo();
initialize();
}
}
watchKey.reset();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
UtilAll.sleep(3000);
}
}
}
String watchFilePath = fileHome + fileName;
FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
@Override
public String getServiceName() {
return "AclWatcherService";
public void onChanged(String path) {
log.info("The plain acl yml changed, reload the context");
load();
}
};
watcherServcie.start();
});
fileWatchService.start();
log.info("Succeed to start AclWatcherService");
this.isWatchStart = true;
} catch (IOException e) {
} catch (Exception e) {
log.error("Failed to start AclWatcherService", e);
}
}
PlainAccessResource getPlainAccessResource(PlainAccessConfig plainAccess) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccess.getAccessKey());
plainAccessResource.setSecretKey(plainAccess.getSecretKey());
plainAccessResource.setWhiteRemoteAddress(plainAccess.getWhiteRemoteAddress());
plainAccessResource.setAdmin(plainAccess.isAdmin());
plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccess.getDefaultGroupPerm()));
plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccess.getDefaultTopicPerm()));
Permission.parseResourcePerms(plainAccessResource, false, plainAccess.getGroupPerms());
Permission.parseResourcePerms(plainAccessResource, true, plainAccess.getTopicPerms());
return plainAccessResource;
}
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
......@@ -200,31 +148,32 @@ public class PlainPermissionLoader {
this.globalWhiteRemoteAddressStrategy.clear();
}
public void addPlainAccessResource(PlainAccessResource plainAccessResource) throws AclException {
if (plainAccessResource.getAccessKey() == null
|| plainAccessResource.getSecretKey() == null
|| plainAccessResource.getAccessKey().length() <= 6
|| plainAccessResource.getSecretKey().length() <= 6) {
public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException {
if (plainAccessConfig.getAccessKey() == null
|| plainAccessConfig.getSecretKey() == null
|| plainAccessConfig.getAccessKey().length() <= 6
|| plainAccessConfig.getSecretKey().length() <= 6) {
throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessResource.getAccessKey(), plainAccessResource.getSecretKey()));
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
}
try {
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory
.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategy);
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
plainAccessResource.setSecretKey(plainAccessConfig.getSecretKey());
plainAccessResource.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
if (plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
log.warn("Duplicate acl config for {}, the newly one may overwrite the old", plainAccessResource.getAccessKey());
}
plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource);
} catch (Exception e) {
throw new AclException(String.format("Load plain access resource failed %s %s", e.getMessage(), plainAccessResource.toString()), e);
}
}
plainAccessResource.setAdmin(plainAccessConfig.isAdmin());
plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultGroupPerm()));
plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultTopicPerm()));
private void addGlobalWhiteRemoteAddress(String remoteAddresses) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(remoteAddresses));
Permission.parseResourcePerms(plainAccessResource, false, plainAccessConfig.getGroupPerms());
Permission.parseResourcePerms(plainAccessResource, true, plainAccessConfig.getTopicPerms());
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress()));
return plainAccessResource;
}
public void validate(PlainAccessResource plainAccessResource) {
......
......@@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -229,5 +230,41 @@ public class PlainAccessValidatorTest {
plainAccessValidator.validate(accessResource);
}
@Test(expected = AclException.class)
public void validateNullAccessKeyTest() {
SessionCredentials sessionCredentials=new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ1");
sessionCredentials.setSecretKey("1234");
AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
plainAccessValidator.validate(accessResource);
}
@Test(expected = AclException.class)
public void validateErrorSecretKeyTest() {
SessionCredentials sessionCredentials=new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("1234");
AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
plainAccessValidator.validate(accessResource);
}
}
......@@ -41,6 +41,7 @@ public class PlainPermissionLoaderTest {
PlainAccessResource ANYPlainAccessResource;
PlainAccessResource DENYPlainAccessResource;
PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
Set<Integer> adminCode = new HashSet<>();
......@@ -88,24 +89,22 @@ public class PlainPermissionLoaderTest {
}
@Test
public void getPlainAccessResourceTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
public void buildPlainAccessResourceTest() {
PlainAccessResource plainAccessResource = null;
PlainAccessConfig plainAccess = new PlainAccessConfig();
plainAccess.setAccessKey("RocketMQ");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
plainAccess.setSecretKey("12345678");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
plainAccess.setWhiteRemoteAddress("127.0.0.1");
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
plainAccess.setAdmin(true);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.isAdmin(), true);
List<String> groups = new ArrayList<String>();
......@@ -113,7 +112,7 @@ public class PlainPermissionLoaderTest {
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
plainAccess.setGroupPerms(groups);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);
......@@ -126,7 +125,7 @@ public class PlainPermissionLoaderTest {
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
plainAccess.setTopicPerms(topics);
plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 6);
......@@ -158,35 +157,41 @@ public class PlainPermissionLoaderTest {
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
}
@Test(expected = AclException.class)
public void checkErrorPerm() {
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
}
@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
plainAccessConfig.setAccessKey(null);
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void accountThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
plainAccessConfig.setAccessKey("123");
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void passWordtNullTest() {
plainAccessResource.setAccessKey(null);
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
plainAccessConfig.setAccessKey(null);
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void passWordThanTest() {
plainAccessResource.setAccessKey("123");
plainPermissionLoader.addPlainAccessResource(plainAccessResource);
plainAccessConfig.setAccessKey("123");
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
new PlainPermissionLoader().initialize();
new PlainPermissionLoader().load();
}
@SuppressWarnings("unchecked")
......@@ -203,24 +208,20 @@ public class PlainPermissionLoaderTest {
@Test
public void isWatchStartTest() {
System.setProperty("java.version", "1.7.11");
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart());
System.setProperty("java.version", "1.6.11");
plainPermissionLoader = new PlainPermissionLoader();
Assert.assertFalse(plainPermissionLoader.isWatchStart());
}
@Test
public void testWatch() throws IOException, IllegalAccessException {
System.setProperty("java.version", "1.7.11");
System.setProperty("rocketmq.home.dir", "src/test/resources/conf");
System.setProperty("rocketmq.acl.plain.file", "watch/plain_acl_watch.yml");
String fileName = "src/test/resources/conf/watch/plain_acl_watch.yml";
public void testWatch() throws IOException, IllegalAccessException ,InterruptedException{
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl-test.yml");
String fileName =System.getProperty("rocketmq.home.dir", "src/test/resources")+System.getProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
File transport = new File(fileName);
transport.delete();
transport.createNewFile();
FileWriter writer = new FileWriter(transport);
writer.write("accounts:\r\n");
writer.write("- accessKey: watchrocketmq\r\n");
......@@ -250,7 +251,7 @@ public class PlainPermissionLoaderTest {
writer.flush();
writer.close();
UtilAll.sleep(1000);
Thread.sleep(1000);
{
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1");
......@@ -260,6 +261,8 @@ public class PlainPermissionLoaderTest {
}
transport.delete();
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
}
@Test(expected = AclException.class)
......
accounts:
- accessKey: watchrocketmq
secretKey: 12345678
whiteRemoteAddress: 127.0.0.1
admin: true
- accessKey: watchrocketmq1
secretKey: 88888888
whiteRemoteAddress: 127.0.0.1
admin: false
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
accounts:
- accessKey: watchrocketmq
secretKey: 12345678
whiteRemoteAddress: 127.0.0.1
admin: true
- accessKey: watchrocketmq1
secretKey: 88888888
whiteRemoteAddress: 127.0.0.1
admin: false
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册