diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java index c915cf35d1c7b8492cdea27c769c59cd98477b15..b87cc2fa44bc9e93a34020d189bd8d084fa24401 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java @@ -17,9 +17,12 @@ package org.apache.rocketmq.acl; +import java.util.List; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface AccessValidator { + /** * Parse to get the AccessResource(user, resource, needed permission) * @@ -35,4 +38,32 @@ public interface AccessValidator { * @param accessResource */ void validate(AccessResource accessResource); + + /** + * Update the access resource config + * + * @param plainAccessConfig + * @return + */ + boolean updateAccessConfig(PlainAccessConfig plainAccessConfig); + + /** + * Delete the access resource config + * + * @return + */ + boolean deleteAccessConfig(String accesskey); + + /** + * Get the access resource config version information + * + * @return + */ + String getAclConfigVersion(); + + /** + * Update globalWhiteRemoteAddresses in acl yaml config file + * @return + */ + boolean updateGlobalWhiteAddrsConfig(List globalWhiteAddrsList); } diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..bfe96f53037cca259b837a461114f879b2b7359c --- /dev/null +++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.acl.common; + +public class AclConstants { + + public static final String CONFIG_GLOBAL_WHITE_ADDRS = "globalWhiteRemoteAddresses"; + + public static final String CONFIG_ACCOUNTS = "accounts"; + + public static final String CONFIG_ACCESS_KEY = "accessKey"; + + public static final String CONFIG_SECRET_KEY = "secretKey"; + + public static final String CONFIG_WHITE_ADDR = "whiteRemoteAddress"; + + public static final String CONFIG_ADMIN_ROLE = "admin"; + + public static final String CONFIG_DEFAULT_TOPIC_PERM = "defaultTopicPerm"; + + public static final String CONFIG_DEFAULT_GROUP_PERM = "defaultGroupPerm"; + + public static final String CONFIG_TOPIC_PERMS = "topicPerms"; + + public static final String CONFIG_GROUP_PERMS = "groupPerms"; + + public static final String CONFIG_DATA_VERSION = "dataVersion"; + + public static final String CONFIG_COUNTER = "counter"; + + public static final String CONFIG_TIME_STAMP = "timestamp"; + + public static final int ACCESS_KEY_MIN_LENGTH = 6; + + public static final int SECRET_KEY_MIN_LENGTH = 6; +} diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java index 39f75a3bd7ae870e7164603000f824fafefb707c..20e1cfa26b90916aecfec39e2669179c85854e20 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java @@ -20,7 +20,9 @@ import com.alibaba.fastjson.JSONObject; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.util.Map; import java.util.SortedMap; import org.apache.commons.lang3.StringUtils; @@ -48,7 +50,7 @@ public class AclUtils { return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody()); } catch (Exception e) { - throw new RuntimeException("incompatible exception.", e); + throw new RuntimeException("Incompatible exception.", e); } } @@ -69,7 +71,7 @@ public class AclUtils { public static void verify(String netaddress, int index) { if (!AclUtils.isScope(netaddress, index)) { - throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress)); + throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress)); } } @@ -127,11 +129,11 @@ public class AclUtils { } public static T getYamlDataObject(String path, Class clazz) { - Yaml ymal = new Yaml(); + Yaml yaml = new Yaml(); FileInputStream fis = null; try { fis = new FileInputStream(new File(path)); - return ymal.loadAs(fis, clazz); + return yaml.loadAs(fis, clazz); } catch (FileNotFoundException ignore) { return null; } catch (Exception e) { @@ -146,13 +148,31 @@ public class AclUtils { } } + public static boolean writeDataObject(String path, Map dataMap) { + Yaml yaml = new Yaml(); + PrintWriter pw = null; + try { + pw = new PrintWriter(new FileWriter(path)); + String dumpAsMap = yaml.dumpAsMap(dataMap); + pw.print(dumpAsMap); + pw.flush(); + } catch (Exception e) { + throw new AclException(e.getMessage()); + } finally { + if (pw != null) { + pw.close(); + } + } + return true; + } + public static RPCHook getAclRPCHook(String fileName) { JSONObject yamlDataObject = null; try { yamlDataObject = AclUtils.getYamlDataObject(fileName, JSONObject.class); } catch (Exception e) { - log.error("convert yaml file to data object error, ",e); + log.error("Convert yaml file to data object error, ",e); return null; } @@ -161,8 +181,8 @@ public class AclUtils { return null; } - String accessKey = yamlDataObject.getString("accessKey"); - String secretKey = yamlDataObject.getString("secretKey"); + String accessKey = yamlDataObject.getString(AclConstants.CONFIG_ACCESS_KEY); + String secretKey = yamlDataObject.getString(AclConstants.CONFIG_SECRET_KEY); if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) { log.warn("AccessKey or secretKey is blank, the acl is not enabled."); diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java index 00072e8e2e2349dc3ddcb4ab377e824a3d01c3c4..a0cceed8c31cf18102ed5f44903f92f78a157a64 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java @@ -43,7 +43,7 @@ public class PlainAccessResource implements AccessResource { private int requestCode; - //the content to calculate the content + // The content to calculate the content private byte[] content; private String signature; diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java index 11b2a43f2a618a4f083c4cab7bf17a1edef31392..c8ce23908484c1f56c9576c6ee97c7b907227539 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.acl.plain; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -25,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; @@ -38,10 +40,10 @@ import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic; public class PlainAccessValidator implements AccessValidator { - private PlainPermissionLoader aclPlugEngine; + private PlainPermissionManager aclPlugEngine; public PlainAccessValidator() { - aclPlugEngine = new PlainPermissionLoader(); + aclPlugEngine = new PlainPermissionManager(); } @Override @@ -56,8 +58,8 @@ public class PlainAccessValidator implements AccessValidator { accessResource.setRequestCode(request.getCode()); if (request.getExtFields() == null) { - //If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern) - //The following logic codes depend on the request's extFields not to be null. + // If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern) + // The following logic codes depend on the request's extFields not to be null. return accessResource; } accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); @@ -135,4 +137,22 @@ public class PlainAccessValidator implements AccessValidator { aclPlugEngine.validate((PlainAccessResource) accessResource); } + @Override + public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) { + return aclPlugEngine.updateAccessConfig(plainAccessConfig); + } + + @Override + public boolean deleteAccessConfig(String accesskey) { + return aclPlugEngine.deleteAccessConfig(accesskey); + } + + @Override public String getAclConfigVersion() { + return aclPlugEngine.getAclConfigDataVersion(); + } + + @Override public boolean updateGlobalWhiteAddrsConfig(List globalWhiteAddrsList) { + return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList); + } + } diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java similarity index 53% rename from acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java rename to acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java index e97825dd14094fe9eee3fb8121a52c55877d5b37..fc7f0f3fdb81c8b1257d915f522e544889e11d52 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java @@ -21,27 +21,29 @@ import com.alibaba.fastjson.JSONObject; import java.io.File; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclConstants; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.Permission; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.srvutil.FileWatchService; -public class PlainPermissionLoader { +public class PlainPermissionManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml"; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); @@ -55,7 +57,9 @@ public class PlainPermissionLoader { private boolean isWatchStart; - public PlainPermissionLoader() { + private final DataVersion dataVersion = new DataVersion(); + + public PlainPermissionManager() { load(); watch(); } @@ -67,7 +71,6 @@ public class PlainPermissionLoader { JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class); - if (plainAclConfData == null || plainAclConfData.isEmpty()) { throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName)); } @@ -80,7 +83,7 @@ public class PlainPermissionLoader { } } - JSONArray accounts = plainAclConfData.getJSONArray("accounts"); + JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS); if (accounts != null && !accounts.isEmpty()) { List plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { @@ -89,10 +92,184 @@ public class PlainPermissionLoader { } } + // For loading dataversion part just + JSONArray tempDataVersion = plainAclConfData.getJSONArray(AclConstants.CONFIG_DATA_VERSION); + if (tempDataVersion != null && !tempDataVersion.isEmpty()) { + List dataVersion = tempDataVersion.toJavaList(DataVersion.class); + DataVersion firstElement = dataVersion.get(0); + this.dataVersion.assignNewOne(firstElement); + } + this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap; } + public String getAclConfigDataVersion() { + return this.dataVersion.toJson(); + } + + private Map updateAclConfigFileVersion(Map updateAclConfigMap) { + + dataVersion.nextVersion(); + List> versionElement = new ArrayList>(); + Map accountsMap = new LinkedHashMap() { + { + put(AclConstants.CONFIG_COUNTER, dataVersion.getCounter().longValue()); + put(AclConstants.CONFIG_TIME_STAMP, dataVersion.getTimestamp()); + } + }; + versionElement.add(accountsMap); + updateAclConfigMap.put(AclConstants.CONFIG_DATA_VERSION, versionElement); + return updateAclConfigMap; + } + + public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) { + + if (plainAccessConfig == null) { + log.error("Parameter value plainAccessConfig is null,Please check your parameter"); + return false; + } + + Map aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, + Map.class); + + List> accounts = (List>) aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS); + Map updateAccountMap = null; + if (accounts != null) { + for (Map account : accounts) { + if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) { + // Update acl access config elements + accounts.remove(account); + updateAccountMap = createAclAccessConfigMap(account, plainAccessConfig); + accounts.add(updateAccountMap); + aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts); + + if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) { + return true; + } + return false; + } + } + // Create acl access config elements + accounts.add(createAclAccessConfigMap(null, plainAccessConfig)); + aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts); + if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) { + return true; + } + return false; + } + + log.error("Users must ensure that the acl yaml config file has accounts node element"); + return false; + } + + private Map createAclAccessConfigMap(Map existedAccoutMap, PlainAccessConfig plainAccessConfig) { + + + Map newAccountsMap = null; + if (existedAccoutMap == null) { + newAccountsMap = new LinkedHashMap(); + } else { + newAccountsMap = existedAccoutMap; + } + + if (StringUtils.isEmpty(plainAccessConfig.getAccessKey()) || + plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH) { + throw new AclException(String.format( + "The accessKey=%s cannot be null and length should longer than 6", + plainAccessConfig.getAccessKey())); + } + newAccountsMap.put(AclConstants.CONFIG_ACCESS_KEY, plainAccessConfig.getAccessKey()); + + if (!StringUtils.isEmpty(plainAccessConfig.getSecretKey())) { + if (plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) { + throw new AclException(String.format( + "The secretKey=%s value length should longer than 6", + plainAccessConfig.getSecretKey())); + } + newAccountsMap.put(AclConstants.CONFIG_SECRET_KEY, (String) plainAccessConfig.getSecretKey()); + } + if (!StringUtils.isEmpty(plainAccessConfig.getWhiteRemoteAddress())) { + newAccountsMap.put(AclConstants.CONFIG_WHITE_ADDR, plainAccessConfig.getWhiteRemoteAddress()); + } + if (!StringUtils.isEmpty(String.valueOf(plainAccessConfig.isAdmin()))) { + newAccountsMap.put(AclConstants.CONFIG_ADMIN_ROLE, plainAccessConfig.isAdmin()); + } + if (!StringUtils.isEmpty(plainAccessConfig.getDefaultTopicPerm())) { + newAccountsMap.put(AclConstants.CONFIG_DEFAULT_TOPIC_PERM, plainAccessConfig.getDefaultTopicPerm()); + } + if (!StringUtils.isEmpty(plainAccessConfig.getDefaultGroupPerm())) { + newAccountsMap.put(AclConstants.CONFIG_DEFAULT_GROUP_PERM, plainAccessConfig.getDefaultGroupPerm()); + } + if (plainAccessConfig.getTopicPerms() != null && !plainAccessConfig.getTopicPerms().isEmpty()) { + newAccountsMap.put(AclConstants.CONFIG_TOPIC_PERMS, plainAccessConfig.getTopicPerms()); + } + if (plainAccessConfig.getGroupPerms() != null && !plainAccessConfig.getGroupPerms().isEmpty()) { + newAccountsMap.put(AclConstants.CONFIG_GROUP_PERMS, plainAccessConfig.getGroupPerms()); + } + + return newAccountsMap; + } + + public boolean deleteAccessConfig(String accesskey) { + if (StringUtils.isEmpty(accesskey)) { + log.error("Parameter value accesskey is null or empty String,Please check your parameter"); + return false; + } + + Map aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, + Map.class); + + List> accounts = (List>) aclAccessConfigMap.get("accounts"); + if (accounts != null) { + Iterator> itemIterator = accounts.iterator(); + while (itemIterator.hasNext()) { + + if (itemIterator.next().get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) { + // Delete the related acl config element + itemIterator.remove(); + aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts); + + if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) { + return true; + } + return false; + } + } + } + log.error("Users must ensure that the acl yaml config file has related acl config elements"); + + return false; + } + + public boolean updateGlobalWhiteAddrsConfig(List globalWhiteAddrsList) { + + if (globalWhiteAddrsList == null) { + log.error("Parameter value globalWhiteAddrsList is null,Please check your parameter"); + return false; + } + + Map aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, + Map.class); + + List globalWhiteRemoteAddrList = (List) aclAccessConfigMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS); + + if (globalWhiteRemoteAddrList != null) { + globalWhiteRemoteAddrList.clear(); + globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList); + + // Update globalWhiteRemoteAddr element in memeory map firstly + aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS,globalWhiteRemoteAddrList); + if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) { + return true; + } + return false; + } + + log.error("Users must ensure that the acl yaml config file has globalWhiteRemoteAddresses flag firstly"); + return false; + } + private void watch() { try { String watchFilePath = fileHome + fileName; @@ -156,8 +333,8 @@ public class PlainPermissionLoader { public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException { if (plainAccessConfig.getAccessKey() == null || plainAccessConfig.getSecretKey() == null - || plainAccessConfig.getAccessKey().length() <= 6 - || plainAccessConfig.getSecretKey().length() <= 6) { + || plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH + || plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) { throw new AclException(String.format( "The accessKey=%s and secretKey=%s cannot be null and length should longer than 6", plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey())); @@ -217,89 +394,4 @@ public class PlainPermissionLoader { public boolean isWatchStart() { return isWatchStart; } - - static class PlainAccessConfig { - - private String accessKey; - - private String secretKey; - - private String whiteRemoteAddress; - - private boolean admin; - - private String defaultTopicPerm; - - private String defaultGroupPerm; - - private List topicPerms; - - private List groupPerms; - - public String getAccessKey() { - return accessKey; - } - - public void setAccessKey(String accessKey) { - this.accessKey = accessKey; - } - - public String getSecretKey() { - return secretKey; - } - - public void setSecretKey(String secretKey) { - this.secretKey = secretKey; - } - - public String getWhiteRemoteAddress() { - return whiteRemoteAddress; - } - - public void setWhiteRemoteAddress(String whiteRemoteAddress) { - this.whiteRemoteAddress = whiteRemoteAddress; - } - - public boolean isAdmin() { - return admin; - } - - public void setAdmin(boolean admin) { - this.admin = admin; - } - - public String getDefaultTopicPerm() { - return defaultTopicPerm; - } - - public void setDefaultTopicPerm(String defaultTopicPerm) { - this.defaultTopicPerm = defaultTopicPerm; - } - - public String getDefaultGroupPerm() { - return defaultGroupPerm; - } - - public void setDefaultGroupPerm(String defaultGroupPerm) { - this.defaultGroupPerm = defaultGroupPerm; - } - - public List getTopicPerms() { - return topicPerms; - } - - public void setTopicPerms(List topicPerms) { - this.topicPerms = topicPerms; - } - - public List getGroupPerms() { - return groupPerms; - } - - public void setGroupPerms(List groupPerms) { - this.groupPerms = groupPerms; - } - - } - } diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java index 4883afa657c2f350ca9c0338dc904575271b435c..5b2627de85dc66f13f1868b7b0c5e014aba3117f 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java @@ -17,7 +17,11 @@ package org.apache.rocketmq.acl.common; import com.alibaba.fastjson.JSONObject; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -131,10 +135,77 @@ public class AclUtilsTest { @Test public void getYamlDataObjectTest() { - Map map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class); + Map map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_correct.yml", Map.class); Assert.assertFalse(map.isEmpty()); } + @Test + public void writeDataObject2YamlFileTest() throws IOException{ + + String targetFileName = "src/test/resources/conf/plain_write_acl.yml"; + File transport = new File(targetFileName); + transport.delete(); + transport.createNewFile(); + + Map aclYamlMap = new HashMap(); + + // For globalWhiteRemoteAddrs element in acl yaml config file + List globalWhiteRemoteAddrs = new ArrayList(); + globalWhiteRemoteAddrs.add("10.10.103.*"); + globalWhiteRemoteAddrs.add("192.168.0.*"); + aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs); + + // For accounts element in acl yaml config file + List> accounts = new ArrayList>(); + Map accountsMap = new LinkedHashMap() { + { + put("accessKey", "RocketMQ"); + put("secretKey", "12345678"); + put("whiteRemoteAddress", "whiteRemoteAddress"); + put("admin", "true"); + } + }; + accounts.add(accountsMap); + aclYamlMap.put("accounts",accounts); + Assert.assertTrue(AclUtils.writeDataObject(targetFileName, aclYamlMap)); + + transport.delete(); + } + + @Test + public void updateExistedYamlFileTest() throws IOException{ + + String targetFileName = "src/test/resources/conf/plain_update_acl.yml"; + File transport = new File(targetFileName); + transport.delete(); + transport.createNewFile(); + + Map aclYamlMap = new HashMap(); + + // For globalWhiteRemoteAddrs element in acl yaml config file + List globalWhiteRemoteAddrs = new ArrayList(); + globalWhiteRemoteAddrs.add("10.10.103.*"); + globalWhiteRemoteAddrs.add("192.168.0.*"); + aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs); + + // Write file to yaml file + AclUtils.writeDataObject(targetFileName, aclYamlMap); + + Map updatedMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + List globalWhiteRemoteAddrList = (List) updatedMap.get("globalWhiteRemoteAddrs"); + globalWhiteRemoteAddrList.clear(); + globalWhiteRemoteAddrList.add("192.168.1.2"); + + // Update file and flush to yaml file + AclUtils.writeDataObject(targetFileName, updatedMap); + + Map readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + List updatedGlobalWhiteRemoteAddrs = (List) readableMap.get("globalWhiteRemoteAddrs"); + Assert.assertEquals("192.168.1.2",updatedGlobalWhiteRemoteAddrs.get(0)); + + transport.delete(); + } + @Test public void getYamlDataIgnoreFileNotFoundExceptionTest() { diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java index b7cdb69aeedf9e2ca7cefc54061f5d8f5af85bd7..bca90756193a6c57cbb9c4952fcdad80aadb11b9 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java @@ -16,14 +16,20 @@ */ package org.apache.rocketmq.acl.plain; + import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.AclConstants; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.header.*; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; @@ -297,4 +303,262 @@ public class PlainAccessValidatorTest { PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress); plainAccessValidator.validate(accessResource); } + + @Test + public void updateAccessAclYamlConfigNormalTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml"); + + String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml"; + Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); + plainAccessConfig.setAccessKey("RocketMQ"); + plainAccessConfig.setSecretKey("1234567890"); + plainAccessConfig.setDefaultGroupPerm("PUB"); + plainAccessConfig.setDefaultTopicPerm("SUB"); + List topicPerms = new ArrayList(); + topicPerms.add("topicC=PUB|SUB"); + topicPerms.add("topicB=PUB"); + plainAccessConfig.setTopicPerms(topicPerms); + List groupPerms = new ArrayList(); + groupPerms.add("groupB=PUB|SUB"); + groupPerms.add("groupC=DENY"); + plainAccessConfig.setGroupPerms(groupPerms); + + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + // Update acl access yaml config file + plainAccessValidator.updateAccessConfig(plainAccessConfig); + + Map readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + List> accounts = (List>)readableMap.get("accounts"); + Map verifyMap = null; + for (Map account : accounts) { + if (account.get("accessKey").equals(plainAccessConfig.getAccessKey())) { + verifyMap = account; + break; + } + } + + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"1234567890"); + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"SUB"); + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB"); + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE),false); + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR),"192.168.0.*"); + Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2); + Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2); + + // Verify the dateversion element is correct or not + List> dataVersions = (List>) readableMap.get("dataVersion"); + Assert.assertEquals(1,dataVersions.get(0).get("counter")); + + // Restore the backup file and flush to yaml file + AclUtils.writeDataObject(targetFileName, backUpAclConfigMap); + } + + @Test + public void updateAccessAclYamlConfigTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml"); + + String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml"; + Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); + plainAccessConfig.setAccessKey("RocketMQ"); + plainAccessConfig.setSecretKey("123456789111"); + + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + // Update element in the acl access yaml config file + plainAccessValidator.updateAccessConfig(plainAccessConfig); + + Map readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + List> accounts = (List>)readableMap.get(AclConstants.CONFIG_ACCOUNTS); + Map verifyMap = null; + for (Map account : accounts) { + if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) { + verifyMap = account; + break; + } + } + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111"); + + // Restore the backup file and flush to yaml file + AclUtils.writeDataObject(targetFileName, backUpAclConfigMap); + } + + + @Test + public void createAndUpdateAccessAclYamlConfigNormalTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml"); + + String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml"; + Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); + plainAccessConfig.setAccessKey("RocketMQ33"); + plainAccessConfig.setSecretKey("123456789111"); + plainAccessConfig.setDefaultGroupPerm("PUB"); + plainAccessConfig.setDefaultTopicPerm("DENY"); + List topicPerms = new ArrayList(); + topicPerms.add("topicC=PUB|SUB"); + topicPerms.add("topicB=PUB"); + plainAccessConfig.setTopicPerms(topicPerms); + List groupPerms = new ArrayList(); + groupPerms.add("groupB=PUB|SUB"); + groupPerms.add("groupC=DENY"); + plainAccessConfig.setGroupPerms(groupPerms); + + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + // Create element in the acl access yaml config file + plainAccessValidator.updateAccessConfig(plainAccessConfig); + + Map readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + List> accounts = (List>)readableMap.get(AclConstants.CONFIG_ACCOUNTS); + Map verifyMap = null; + for (Map account : accounts) { + if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) { + verifyMap = account; + break; + } + } + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111"); + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"DENY"); + Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB"); + Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2); + Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2); + Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicC=PUB|SUB")); + Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicB=PUB")); + Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupB=PUB|SUB")); + Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupC=DENY")); + + // Verify the dateversion element is correct or not + List> dataVersions = (List>) readableMap.get(AclConstants.CONFIG_DATA_VERSION); + Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER)); + + // Update element in the acl config yaml file + PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig(); + plainAccessConfig2.setAccessKey("rocketmq2"); + plainAccessConfig2.setSecretKey("1234567890123"); + + // Update acl access yaml config file secondly + plainAccessValidator.updateAccessConfig(plainAccessConfig2); + + Map readableMap2 = AclUtils.getYamlDataObject(targetFileName, Map.class); + List> accounts2 = (List>)readableMap2.get(AclConstants.CONFIG_ACCOUNTS); + Map verifyMap2 = null; + for (Map account : accounts2) { + if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig2.getAccessKey())) { + verifyMap2 = account; + break; + } + } + + // Verify the dateversion element after updating is correct or not + List> dataVersions2 = (List>) readableMap2.get(AclConstants.CONFIG_DATA_VERSION); + Assert.assertEquals(2,dataVersions2.get(0).get(AclConstants.CONFIG_COUNTER)); + Assert.assertEquals(verifyMap2.get(AclConstants.CONFIG_SECRET_KEY),"1234567890123"); + + + // Restore the backup file and flush to yaml file + AclUtils.writeDataObject(targetFileName, backUpAclConfigMap); + } + + @Test(expected = AclException.class) + public void updateAccessAclYamlConfigExceptionTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml"); + + PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); + plainAccessConfig.setAccessKey("RocketMQ"); + plainAccessConfig.setSecretKey("12345"); + + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + // Update acl access yaml config file + plainAccessValidator.updateAccessConfig(plainAccessConfig); + } + + @Test + public void deleteAccessAclYamlConfigNormalTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_delete.yml"); + + String targetFileName = "src/test/resources/conf/plain_acl_delete.yml"; + Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + + String accessKey = "rocketmq2"; + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + plainAccessValidator.deleteAccessConfig(accessKey); + + Map readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + List> accounts = (List>)readableMap.get(AclConstants.CONFIG_ACCOUNTS); + Map verifyMap = null; + for (Map account : accounts) { + if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accessKey)) { + verifyMap = account; + break; + } + } + + // Verify the specified element is removed or not + Assert.assertEquals(verifyMap,null); + // Verify the dateversion element is correct or not + List> dataVersions = (List>) readableMap.get(AclConstants.CONFIG_DATA_VERSION); + Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER)); + + // Restore the backup file and flush to yaml file + AclUtils.writeDataObject(targetFileName, backUpAclConfigMap); + } + + @Test + public void updateAccessAclYamlConfigWithNoAccoutsExceptionTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_with_no_accouts.yml"); + + String targetFileName = "src/test/resources/conf/plain_acl_with_no_accouts.yml"; + Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); + plainAccessConfig.setAccessKey("RocketMQ"); + plainAccessConfig.setSecretKey("1234567890"); + + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + // Update acl access yaml config file and verify the return value is true + Assert.assertEquals(plainAccessValidator.updateAccessConfig(plainAccessConfig), false); + } + + @Test + public void updateGlobalWhiteAddrsNormalTest() { + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_global_white_addrs.yml"); + + String targetFileName = "src/test/resources/conf/plain_acl_global_white_addrs.yml"; + Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); + // Update global white remote addr value list in the acl access yaml config file + + List globalWhiteAddrsList = new ArrayList(); + globalWhiteAddrsList.add("10.10.154.1"); + globalWhiteAddrsList.add("10.10.154.2"); + globalWhiteAddrsList.add("10.10.154.3"); + plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList); + + Map readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class); + + List globalWhiteAddrList = (List)readableMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS); + Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.1")); + Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.2")); + Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.3")); + + // Verify the dateversion element is correct or not + List> dataVersions = (List>) readableMap.get(AclConstants.CONFIG_DATA_VERSION); + Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER)); + + // Restore the backup file and flush to yaml file + AclUtils.writeDataObject(targetFileName, backUpAclConfigMap); + } + } diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java similarity index 75% rename from acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java rename to acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java index 575c9018743f3890c0e07e21b6e34c876861caf8..d5ffb0c1d156226a7652bd4b08f4c1ae0e2cb07e 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java @@ -26,32 +26,31 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.acl.common.AclException; +import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.Permission; -import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig; -import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.PlainAccessConfig; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class PlainPermissionLoaderTest { +public class PlainPermissionManagerTest { - PlainPermissionLoader plainPermissionLoader; + PlainPermissionManager plainPermissionManager; PlainAccessResource PUBPlainAccessResource; PlainAccessResource SUBPlainAccessResource; PlainAccessResource ANYPlainAccessResource; PlainAccessResource DENYPlainAccessResource; PlainAccessResource plainAccessResource = new PlainAccessResource(); PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); - PlainAccessResource plainAccessResourceTwo = new PlainAccessResource(); Set adminCode = new HashSet<>(); @Before public void init() throws NoSuchFieldException, SecurityException, IOException { - // UPDATE_AND_CREATE_TOPIC + // UPDATE_AND_CREATE_TOPIC adminCode.add(17); - // UPDATE_BROKER_CONFIG + // UPDATE_BROKER_CONFIG adminCode.add(25); - // DELETE_TOPIC_IN_BROKER + // DELETE_TOPIC_IN_BROKER adminCode.add(215); // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP adminCode.add(200); @@ -65,7 +64,8 @@ public class PlainPermissionLoaderTest { System.setProperty("rocketmq.home.dir", "src/test/resources"); System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); - plainPermissionLoader = new PlainPermissionLoader(); + + plainPermissionManager = new PlainPermissionManager(); } @@ -95,16 +95,16 @@ public class PlainPermissionLoaderTest { plainAccess.setAccessKey("RocketMQ"); plainAccess.setSecretKey("12345678"); - plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess); Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ"); Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678"); plainAccess.setWhiteRemoteAddress("127.0.0.1"); - plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess); Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1"); plainAccess.setAdmin(true); - plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess); Assert.assertEquals(plainAccessResource.isAdmin(), true); List groups = new ArrayList(); @@ -112,7 +112,7 @@ public class PlainPermissionLoaderTest { groups.add("groupB=PUB|SUB"); groups.add("groupC=PUB"); plainAccess.setGroupPerms(groups); - plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess); Map resourcePermMap = plainAccessResource.getResourcePermMap(); Assert.assertEquals(resourcePermMap.size(), 3); @@ -125,7 +125,7 @@ public class PlainPermissionLoaderTest { topics.add("topicB=PUB|SUB"); topics.add("topicC=PUB"); plainAccess.setTopicPerms(topics); - plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess); resourcePermMap = plainAccessResource.getResourcePermMap(); Assert.assertEquals(resourcePermMap.size(), 6); @@ -138,7 +138,7 @@ public class PlainPermissionLoaderTest { public void checkPermAdmin() { PlainAccessResource plainAccessResource = new PlainAccessResource(); plainAccessResource.setRequestCode(17); - plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource); + plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource); } @Test @@ -146,15 +146,15 @@ public class PlainPermissionLoaderTest { PlainAccessResource plainAccessResource = new PlainAccessResource(); plainAccessResource.addResourceAndPerm("topicA", Permission.PUB); - plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource); + plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource); plainAccessResource.addResourceAndPerm("topicB", Permission.SUB); - plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource); + plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource); plainAccessResource = new PlainAccessResource(); plainAccessResource.addResourceAndPerm("topicB", Permission.SUB); - plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource); + plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource); plainAccessResource.addResourceAndPerm("topicA", Permission.PUB); - plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource); + plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource); } @Test(expected = AclException.class) @@ -162,55 +162,58 @@ public class PlainPermissionLoaderTest { plainAccessResource = new PlainAccessResource(); plainAccessResource.addResourceAndPerm("topicF", Permission.PUB); - plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource); + plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource); } @Test(expected = AclException.class) public void accountNullTest() { plainAccessConfig.setAccessKey(null); - plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); + plainPermissionManager.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void accountThanTest() { plainAccessConfig.setAccessKey("123"); - plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); + plainPermissionManager.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void passWordtNullTest() { plainAccessConfig.setAccessKey(null); - plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); + plainPermissionManager.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void passWordThanTest() { plainAccessConfig.setAccessKey("123"); - plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); + plainPermissionManager.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void testPlainAclPlugEngineInit() { System.setProperty("rocketmq.home.dir", ""); - new PlainPermissionLoader().load(); + new PlainPermissionManager().load(); } @SuppressWarnings("unchecked") @Test public void cleanAuthenticationInfoTest() throws IllegalAccessException { - //plainPermissionLoader.addPlainAccessResource(plainAccessResource); - Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); + // PlainPermissionManager.addPlainAccessResource(plainAccessResource); + Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true); Assert.assertFalse(plainAccessResourceMap.isEmpty()); - plainPermissionLoader.clearPermissionInfo(); - plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); + plainPermissionManager.clearPermissionInfo(); + plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true); Assert.assertTrue(plainAccessResourceMap.isEmpty()); + // RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml"); } @Test public void isWatchStartTest() { - PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader(); - Assert.assertTrue(plainPermissionLoader.isWatchStart()); + PlainPermissionManager plainPermissionManager = new PlainPermissionManager(); + Assert.assertTrue(plainPermissionManager.isWatchStart()); + // RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml"); + } @@ -231,11 +234,12 @@ public class PlainPermissionLoaderTest { writer.flush(); writer.close(); - PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader(); - Assert.assertTrue(plainPermissionLoader.isWatchStart()); + + PlainPermissionManager plainPermissionManager = new PlainPermissionManager(); + Assert.assertTrue(plainPermissionManager.isWatchStart()); { - Map plainAccessResourceMap = (Map) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); + Map plainAccessResourceMap = (Map) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true); PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq"); Assert.assertNotNull(accessResource); Assert.assertEquals(accessResource.getSecretKey(), "12345678"); @@ -243,17 +247,19 @@ public class PlainPermissionLoaderTest { } - writer = new FileWriter(new File(fileName), true); - writer.write("- accessKey: watchrocketmq1\r\n"); - writer.write(" secretKey: 88888888\r\n"); - writer.write(" whiteRemoteAddress: 127.0.0.1\r\n"); - writer.write(" admin: false\r\n"); - writer.flush(); - writer.close(); + Map updatedMap = AclUtils.getYamlDataObject(fileName, Map.class); + List> accounts = (List>) updatedMap.get("accounts"); + accounts.get(0).remove("accessKey"); + accounts.get(0).remove("secretKey"); + accounts.get(0).put("accessKey", "watchrocketmq1"); + accounts.get(0).put("secretKey", "88888888"); + accounts.get(0).put("admin", "false"); + // Update file and flush to yaml file + AclUtils.writeDataObject(fileName, updatedMap); Thread.sleep(1000); { - Map plainAccessResourceMap = (Map) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); + Map plainAccessResourceMap = (Map) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true); PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1"); Assert.assertNotNull(accessResource); Assert.assertEquals(accessResource.getSecretKey(), "88888888"); @@ -268,8 +274,7 @@ public class PlainPermissionLoaderTest { @Test(expected = AclException.class) public void initializeTest() { System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml"); - new PlainPermissionLoader(); + new PlainPermissionManager(); } - } diff --git a/acl/src/test/resources/conf/plain_acl_correct.yml b/acl/src/test/resources/conf/plain_acl_correct.yml new file mode 100644 index 0000000000000000000000000000000000000000..59bd6d4ff298dfaf190876fd0684b1857186449b --- /dev/null +++ b/acl/src/test/resources/conf/plain_acl_correct.yml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## suggested format + +globalWhiteRemoteAddresses: +- 10.10.103.* +- 192.168.0.* +accounts: +- accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: 192.168.0.* + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: SUB + topicPerms: + - topicA=DENY + - topicB=PUB|SUB + - topicC=SUB + groupPerms: + - groupA=DENY + - groupB=SUB + - groupC=SUB +- accessKey: rocketmq2 + secretKey: 12345678 + whiteRemoteAddress: 192.168.1.* + admin: true diff --git a/acl/src/test/resources/conf/plain_acl_delete.yml b/acl/src/test/resources/conf/plain_acl_delete.yml new file mode 100644 index 0000000000000000000000000000000000000000..59bd6d4ff298dfaf190876fd0684b1857186449b --- /dev/null +++ b/acl/src/test/resources/conf/plain_acl_delete.yml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## suggested format + +globalWhiteRemoteAddresses: +- 10.10.103.* +- 192.168.0.* +accounts: +- accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: 192.168.0.* + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: SUB + topicPerms: + - topicA=DENY + - topicB=PUB|SUB + - topicC=SUB + groupPerms: + - groupA=DENY + - groupB=SUB + - groupC=SUB +- accessKey: rocketmq2 + secretKey: 12345678 + whiteRemoteAddress: 192.168.1.* + admin: true diff --git a/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml b/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml new file mode 100644 index 0000000000000000000000000000000000000000..59bd6d4ff298dfaf190876fd0684b1857186449b --- /dev/null +++ b/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## suggested format + +globalWhiteRemoteAddresses: +- 10.10.103.* +- 192.168.0.* +accounts: +- accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: 192.168.0.* + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: SUB + topicPerms: + - topicA=DENY + - topicB=PUB|SUB + - topicC=SUB + groupPerms: + - groupA=DENY + - groupB=SUB + - groupC=SUB +- accessKey: rocketmq2 + secretKey: 12345678 + whiteRemoteAddress: 192.168.1.* + admin: true diff --git a/acl/src/test/resources/conf/plain_acl_update_create.yml b/acl/src/test/resources/conf/plain_acl_update_create.yml new file mode 100644 index 0000000000000000000000000000000000000000..59bd6d4ff298dfaf190876fd0684b1857186449b --- /dev/null +++ b/acl/src/test/resources/conf/plain_acl_update_create.yml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## suggested format + +globalWhiteRemoteAddresses: +- 10.10.103.* +- 192.168.0.* +accounts: +- accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: 192.168.0.* + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: SUB + topicPerms: + - topicA=DENY + - topicB=PUB|SUB + - topicC=SUB + groupPerms: + - groupA=DENY + - groupB=SUB + - groupC=SUB +- accessKey: rocketmq2 + secretKey: 12345678 + whiteRemoteAddress: 192.168.1.* + admin: true diff --git a/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml b/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml new file mode 100644 index 0000000000000000000000000000000000000000..939f7c98ca6f154bed0b6ece2ea194ff898ef30e --- /dev/null +++ b/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## suggested format + +globalWhiteRemoteAddresses: +- 10.10.103.* +- 192.168.0.* \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 427f86118905e5eb18d0cbd0a1bced777706e050..56e3fe4b0d596c69875c726668393b3af727cc05 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -162,7 +163,7 @@ public class BrokerController { private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; private Future slaveSyncFuture; - + private Map accessValidatorMap = new HashMap(); public BrokerController( final BrokerConfig brokerConfig, @@ -502,6 +503,7 @@ public class BrokerController { for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; + accessValidatorMap.put(validator.getClass(),validator); this.registerServerRPCHook(new RPCHook() { @Override @@ -1101,7 +1103,9 @@ public class BrokerController { } - + public Map getAccessValidatorMap() { + return accessValidatorMap; + } private void handleSlaveSynchronize(BrokerRole role) { if (role == BrokerRole.SLAVE) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 73fe43942709136ec2d94c879feb66b9a0e23286..f23cca62d9e7b093a281b8b63728bfa5183bd706 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -37,6 +39,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; @@ -44,6 +47,10 @@ import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageDecoder; @@ -201,6 +208,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return fetchAllConsumeStatsInBroker(ctx, request); case RequestCode.QUERY_CONSUME_QUEUE: return queryConsumeQueue(ctx, request); + case RequestCode.UPDATE_AND_CREATE_ACL_CONFIG: + return updateAndCreateAccessConfig(ctx, request); + case RequestCode.DELETE_ACL_CONFIG: + return deleteAccessConfig(ctx, request); + case RequestCode.GET_BROKER_CLUSTER_ACL_INFO: + return getBrokerAclConfigVersion(ctx, request); + case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG: + return updateGlobalWhiteAddrsConfig(ctx, request); default: break; } @@ -269,6 +284,140 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + final CreateAccessConfigRequestHeader requestHeader = + (CreateAccessConfigRequestHeader) request.decodeCommandCustomHeader(CreateAccessConfigRequestHeader.class); + + PlainAccessConfig accessConfig = new PlainAccessConfig(); + accessConfig.setAccessKey(requestHeader.getAccessKey()); + accessConfig.setSecretKey(requestHeader.getSecretKey()); + accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress()); + accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm()); + accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm()); + accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),",")); + accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),",")); + accessConfig.setAdmin(requestHeader.isAdmin()); + try { + + AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); + if (accessValidator.updateAccessConfig(accessConfig)) { + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark(null); + ctx.writeAndFlush(response); + } else { + String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been updated failed."; + log.warn(errorMsg); + response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED); + response.setRemark(errorMsg); + return response; + } + } catch (Exception e) { + log.error("Failed to generate a proper update accessvalidator response", e); + response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED); + response.setRemark(e.getMessage()); + return response; + } + + return null; + } + + private synchronized RemotingCommand deleteAccessConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + final DeleteAccessConfigRequestHeader requestHeader = + (DeleteAccessConfigRequestHeader) request.decodeCommandCustomHeader(DeleteAccessConfigRequestHeader.class); + log.info("DeleteAccessConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + try { + String accessKey = requestHeader.getAccessKey(); + AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); + if (accessValidator.deleteAccessConfig(accessKey)) { + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark(null); + ctx.writeAndFlush(response); + } else { + String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been deleted failed."; + log.warn(errorMsg); + response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED); + response.setRemark(errorMsg); + return response; + } + + } catch (Exception e) { + log.error("Failed to generate a proper delete accessvalidator response", e); + response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED); + response.setRemark(e.getMessage()); + return response; + } + + return null; + } + + private synchronized RemotingCommand updateGlobalWhiteAddrsConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + final UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = + (UpdateGlobalWhiteAddrsConfigRequestHeader) request.decodeCommandCustomHeader(UpdateGlobalWhiteAddrsConfigRequestHeader.class); + + try { + AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); + if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) { + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark(null); + ctx.writeAndFlush(response); + } else { + String errorMsg = "The globalWhiteAddresses[" + requestHeader.getGlobalWhiteAddrs() + "] has been updated failed."; + log.warn(errorMsg); + response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED); + response.setRemark(errorMsg); + return response; + } + } catch (Exception e) { + log.error("Failed to generate a proper update globalWhiteAddresses response", e); + response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED); + response.setRemark(e.getMessage()); + return response; + } + + return null; + } + + private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, RemotingCommand request) { + + final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerAclConfigResponseHeader.class); + + final GetBrokerAclConfigResponseHeader responseHeader = (GetBrokerAclConfigResponseHeader)response.readCustomHeader(); + + try { + AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); + + responseHeader.setVersion(accessValidator.getAclConfigVersion()); + responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr()); + responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } catch (Exception e) { + log.error("Failed to generate a proper getBrokerAclConfigVersion response", e); + } + + return null; + } + private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); // final GetAllTopicConfigResponseHeader responseHeader = diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 9048ab8545c825487cec0c3b5613d45e90bbbed0..c3382caa280aaac1baa7bb429d228884195403a2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -44,8 +44,10 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; @@ -63,6 +65,7 @@ import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; @@ -86,10 +89,13 @@ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; +import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; @@ -123,6 +129,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; @@ -284,6 +291,104 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } + public void createPlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig, + final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + CreateAccessConfigRequestHeader requestHeader = new CreateAccessConfigRequestHeader(); + requestHeader.setAccessKey(plainAccessConfig.getAccessKey()); + requestHeader.setSecretKey(plainAccessConfig.getSecretKey()); + requestHeader.setAdmin(plainAccessConfig.isAdmin()); + requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm()); + requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm()); + requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); + requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),",")); + requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),",")); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public void deleteAccessConfig(final String addr, final String accessKey, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + DeleteAccessConfigRequestHeader requestHeader = new DeleteAccessConfigRequestHeader(); + requestHeader.setAccessKey(accessKey); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_ACL_CONFIG, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader(); + requestHeader.setGlobalWhiteAddrs(globalWhiteAddrs); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + GetBrokerAclConfigResponseHeader responseHeader = + (GetBrokerAclConfigResponseHeader) response.decodeCommandCustomHeader(GetBrokerAclConfigResponseHeader.class); + + ClusterAclVersionInfo clusterAclVersionInfo = new ClusterAclVersionInfo(); + clusterAclVersionInfo.setClusterName(responseHeader.getClusterName()); + clusterAclVersionInfo.setBrokerName(responseHeader.getBrokerName()); + clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr()); + clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class)); + return clusterAclVersionInfo; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + + } + public SendResult sendMessage( final String addr, final String brokerName, diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index c13e75c206c793b2c1bc23a593de89e21f9ac10e..28555959ba9ba268ec1ded6ae24faae23837c15e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -19,17 +19,24 @@ package org.apache.rocketmq.client.impl; import java.lang.reflect.Field; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; +import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -210,6 +217,84 @@ public class MQClientAPIImplTest { } } + @Test + public void testCreatePlainAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException { + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createSuccessResponse4UpdateAclConfig(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + PlainAccessConfig config = createUpdateAclConfig(); + + try { + mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000); + } catch (MQClientException ex) { + + } + } + + @Test + public void testCreatePlainAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException { + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createErrorResponse4UpdateAclConfig(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + PlainAccessConfig config = createUpdateAclConfig(); + try { + mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000); + } catch (MQClientException ex) { + assertThat(ex.getResponseCode()).isEqualTo(209); + assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been updated failed"); + } + } + + @Test + public void testDeleteAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException { + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createSuccessResponse4DeleteAclConfig(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + String accessKey = "1234567"; + try { + mqClientAPI.deleteAccessConfig(brokerAddr, accessKey, 3 * 1000); + } catch (MQClientException ex) { + + } + } + + @Test + public void testDeleteAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException { + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createErrorResponse4DeleteAclConfig(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + try { + mqClientAPI.deleteAccessConfig(brokerAddr, "11111", 3 * 1000); + } catch (MQClientException ex) { + assertThat(ex.getResponseCode()).isEqualTo(210); + assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed"); + } + } + private RemotingCommand createSuccessResponse(RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); response.setCode(ResponseCode.SUCCESS); @@ -228,6 +313,58 @@ public class MQClientAPIImplTest { return response; } + private RemotingCommand createSuccessResponse4UpdateAclConfig(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark(null); + + return response; + } + + private RemotingCommand createSuccessResponse4DeleteAclConfig(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark(null); + + return response; + } + + private RemotingCommand createErrorResponse4UpdateAclConfig(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark("corresponding to accessConfig has been updated failed"); + + return response; + } + + private RemotingCommand createErrorResponse4DeleteAclConfig(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark("corresponding to accessConfig has been deleted failed"); + + return response; + } + + private PlainAccessConfig createUpdateAclConfig() { + + PlainAccessConfig config = new PlainAccessConfig(); + config.setAccessKey("Rocketmq111"); + config.setSecretKey("123456789"); + config.setAdmin(true); + config.setWhiteRemoteAddress("127.0.0.1"); + config.setDefaultTopicPerm("DENY"); + config.setDefaultGroupPerm("SUB"); + return config; + } + private SendMessageRequestHeader createSendMessageRequestHeader() { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setBornTimestamp(System.currentTimeMillis()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index b45ad02818594a414783a3708da5d8668decab63..0d00c9bdb448841226c093e1cf0d00d532c62104 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -200,9 +200,6 @@ public class DefaultMQConsumerWithTraceTest { @Test public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); - //when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - //when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute()); - final CountDownLatch countDownLatch = new CountDownLatch(1); final MessageExt[] messageExts = new MessageExt[1]; diff --git a/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java b/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..b193f43711fea332ba3668e51b9eaabadce1b124 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import java.util.List; + +public class PlainAccessConfig { + + private String accessKey; + + private String secretKey; + + private String whiteRemoteAddress; + + private boolean admin; + + private String defaultTopicPerm; + + private String defaultGroupPerm; + + private List topicPerms; + + private List groupPerms; + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getWhiteRemoteAddress() { + return whiteRemoteAddress; + } + + public void setWhiteRemoteAddress(String whiteRemoteAddress) { + this.whiteRemoteAddress = whiteRemoteAddress; + } + + public boolean isAdmin() { + return admin; + } + + public void setAdmin(boolean admin) { + this.admin = admin; + } + + public String getDefaultTopicPerm() { + return defaultTopicPerm; + } + + public void setDefaultTopicPerm(String defaultTopicPerm) { + this.defaultTopicPerm = defaultTopicPerm; + } + + public String getDefaultGroupPerm() { + return defaultGroupPerm; + } + + public void setDefaultGroupPerm(String defaultGroupPerm) { + this.defaultGroupPerm = defaultGroupPerm; + } + + public List getTopicPerms() { + return topicPerms; + } + + public void setTopicPerms(List topicPerms) { + this.topicPerms = topicPerms; + } + + public List getGroupPerms() { + return groupPerms; + } + + public void setGroupPerms(List groupPerms) { + this.groupPerms = groupPerms; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index dee6ca291144d7d55265913cbb9fab71cccf1637..33674dc8a24f7038543ec276c2715466852346d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -28,15 +28,17 @@ import java.net.NetworkInterface; import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; - +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -529,4 +531,28 @@ public class UtilAll { file.delete(); } } + + public static String List2String(List list,String splitor) { + if (list == null || list.size() == 0) { + return null; + } + StringBuffer str = new StringBuffer(); + for (int i = 0;i < list.size();i++) { + str.append(list.get(i)); + if (i == list.size() - 1) { + continue; + } + str.append(splitor); + } + return str.toString(); + } + + public static List String2List(String str,String splitor) { + if (StringUtils.isEmpty(str)) { + return null; + } + + String[] addrArray = str.split(splitor); + return Arrays.asList(addrArray); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 8cf2d46adad6920736b357e4ab2ef2fff8fd5f66..b771b77b659949ba8564bd474031b4c79386bf18 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -70,6 +70,14 @@ public class RequestCode { public static final int CHECK_CLIENT_CONFIG = 46; + public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50; + + public static final int DELETE_ACL_CONFIG = 51; + + public static final int GET_BROKER_CLUSTER_ACL_INFO = 52; + + public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53; + public static final int PUT_KV_CONFIG = 100; public static final int GET_KV_CONFIG = 101; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java index f62c4ea61874528ad8d922972ccf358c603c62b4..dc744448f6cf6999f1b8cb79860c8d916de88610 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -73,4 +73,11 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int CONSUME_MSG_TIMEOUT = 207; public static final int NO_MESSAGE = 208; + + public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209; + + public static final int DELETE_ACL_CONFIG_FAILED = 210; + + public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; + } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..aeae9d59a96f4d3311d93da02fc0eebf5cb1d6ea --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class ClusterAclVersionInfo extends RemotingSerializable { + + private String brokerName; + + private String brokerAddr; + + private DataVersion aclConfigDataVersion; + + private String clusterName; + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public DataVersion getAclConfigDataVersion() { + return aclConfigDataVersion; + } + + public void setAclConfigDataVersion(DataVersion aclConfigDataVersion) { + this.aclConfigDataVersion = aclConfigDataVersion; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..36990fcf641154f9b9f1f8ccda22e2906dc9a730 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class CreateAccessConfigRequestHeader implements CommandCustomHeader { + + @CFNotNull + private String accessKey; + + private String secretKey; + + private String whiteRemoteAddress; + + private boolean admin; + + private String defaultTopicPerm; + + private String defaultGroupPerm; + + // list string,eg: topicA=DENY,topicD=SUB + private String topicPerms; + + // list string,eg: groupD=DENY,groupD=SUB + private String groupPerms; + + + @Override public void checkFields() throws RemotingCommandException { + + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getWhiteRemoteAddress() { + return whiteRemoteAddress; + } + + public void setWhiteRemoteAddress(String whiteRemoteAddress) { + this.whiteRemoteAddress = whiteRemoteAddress; + } + + public boolean isAdmin() { + return admin; + } + + public void setAdmin(boolean admin) { + this.admin = admin; + } + + public String getDefaultTopicPerm() { + return defaultTopicPerm; + } + + public void setDefaultTopicPerm(String defaultTopicPerm) { + this.defaultTopicPerm = defaultTopicPerm; + } + + public String getDefaultGroupPerm() { + return defaultGroupPerm; + } + + public void setDefaultGroupPerm(String defaultGroupPerm) { + this.defaultGroupPerm = defaultGroupPerm; + } + + public String getTopicPerms() { + return topicPerms; + } + + public void setTopicPerms(String topicPerms) { + this.topicPerms = topicPerms; + } + + public String getGroupPerms() { + return groupPerms; + } + + public void setGroupPerms(String groupPerms) { + this.groupPerms = groupPerms; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..293480ce3210a300758b0c32a0b8c5c33cf484c8 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class DeleteAccessConfigRequestHeader implements CommandCustomHeader { + + @CFNotNull + private String accessKey; + + @Override public void checkFields() throws RemotingCommandException { + + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..43fbe47ae7f67109dbf2d6f86a386e64a646854c --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class GetBrokerAclConfigResponseHeader implements CommandCustomHeader { + + @CFNotNull + private String version; + + @CFNotNull + private String brokerName; + + @CFNotNull + private String brokerAddr; + + @CFNotNull + private String clusterName; + + @Override public void checkFields() throws RemotingCommandException { + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..2d42c750cf25e6c329428b91eb77e08df53975cd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class UpdateGlobalWhiteAddrsConfigRequestHeader implements CommandCustomHeader { + + @CFNotNull + private String globalWhiteAddrs; + + @Override public void checkFields() throws RemotingCommandException { + + } + + public String getGlobalWhiteAddrs() { + return globalWhiteAddrs; + } + + public void setGlobalWhiteAddrs(String globalWhiteAddrs) { + this.globalWhiteAddrs = globalWhiteAddrs; + } +} diff --git a/docs/cn/acl/user_guide.md b/docs/cn/acl/user_guide.md index 1fea9ef54a0a3f916db5d889b53c746cae6bc81c..01c37dc85bd15f9d424abb39aba31167aa7a06a0 100644 --- a/docs/cn/acl/user_guide.md +++ b/docs/cn/acl/user_guide.md @@ -82,5 +82,76 @@ RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可 (2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组 内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。 -**特别注意**在[4.5.0]版本中即使使用上面所述的白名单也无法解决开启ACL的问题,解决该问题的[PR链接](https://github.com/apache/rocketmq/pull/1149) +## 7. ACL mqadmin配置管理命令 +### 7.1 更新ACL配置文件中“account”的属性值 + +该命令的示例如下: + +sh mqadmin updateAclConfig -n 192.168.1.2:9876 -b 192.168.12.134:10911 -a RocketMQ -s 1234567809123 +-t topicA=DENY,topicD=SUB -g groupD=DENY,groupB=SUB + +说明:如果不存在则会在ACL Config YAML配置文件中创建;若存在,则会更新对应的“accounts”的属性值; +如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。 + +| 参数 | 取值 | 含义 | +| --- | --- | --- | +| n | eg:192.168.1.2:9876 | namesrv地址(必填) | +| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) | +| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) | +| a | eg:RocketMQ | Access Key值(必填) | +| s | eg:1234567809123 | Secret Key值(可选) | +| m | eg:true | 是否管理员账户(可选) | +| w | eg:192.168.0.* | whiteRemoteAddress,用户IP白名单(可选) | +| i | eg:DENY;PUB;SUB;PUB\|SUB | defaultTopicPerm,默认Topic权限(可选) | +| u | eg:DENY;PUB;SUB;PUB\|SUB | defaultGroupPerm,默认ConsumerGroup权限(可选) | +| t | eg:topicA=DENY,topicD=SUB | topicPerms,各个Topic的权限(可选) | +| g | eg:groupD=DENY,groupB=SUB | groupPerms,各个ConsumerGroup的权限(可选) | + +### 7.2 删除ACL配置文件里面的对应“account” +该命令的示例如下: + +sh mqadmin deleteAccessConfig -n 192.168.1.2:9876 -c DefaultCluster -a RocketMQ + +说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。 +其中,参数"a"为Access Key的值,用以标识唯一账户id,因此该命令的参数中指定账户id即可。 + +| 参数 | 取值 | 含义 | +| --- | --- | --- | +| n | eg:192.168.1.2:9876 | namesrv地址(必填) | +| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) | +| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) | +| a | eg:RocketMQ | Access Key的值(必填) | + + +### 7.3 更新ACL配置文件里面中的全局白名单 +该命令的示例如下: + +sh mqadmin updateGlobalWhiteAddr -n 192.168.1.2:9876 -b 192.168.12.134:10911 -g 10.10.154.1,10.10.154.2 + +说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。 +其中,参数"g"为全局IP白名的值,用以更新ACL配置文件中的“globalWhiteRemoteAddresses”字段的属性值。 + +| 参数 | 取值 | 含义 | +| --- | --- | --- | +| n | eg:192.168.1.2:9876 | namesrv地址(必填) | +| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) | +| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) | +| g | eg:10.10.154.1,10.10.154.2 | 全局IP白名单(必填) | + +### 7.4 查询集群/Broker的ACL配置文件版本信息 +该命令的示例如下: + +sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster + +说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。 + +| 参数 | 取值 | 含义 | +| --- | --- | --- | +| n | eg:192.168.1.2:9876 | namesrv地址(必填) | +| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) | +| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) | + + +**特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题, +在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149; \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index dc829c1c1b516768724932a39f15ceca82fd7676..f00dcefa4321820ba752af74c156763622e11245 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.RollbackStats; @@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; @@ -160,6 +162,27 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config); } + @Override + public void createAndUpdatePlainAccessConfig(String addr, + PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + defaultMQAdminExtImpl.createAndUpdatePlainAccessConfig(addr, config); + } + + @Override public void deletePlainAccessConfig(String addr, + String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + defaultMQAdminExtImpl.deletePlainAccessConfig(addr, accessKey); + } + + @Override public void updateGlobalWhiteAddrConfig(String addr, + String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs); + } + + @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo( + String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr); + } + @Override public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, @@ -305,6 +328,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { defaultMQAdminExtImpl.deleteKvConfig(namespace, key); } + @Override public List resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 2a7815b61709c802d7881dd821919af83019aaba..502e9daa33b65e89ab311386ed4037c182f3a4c5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -47,6 +48,7 @@ import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageConst; @@ -87,6 +89,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.TrackType; public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { + private final InternalLogger log = ClientLogger.getLog(); private final DefaultMQAdminExt defaultMQAdminExt; private ServiceState serviceState = ServiceState.CREATE_JUST; @@ -178,6 +181,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); } + @Override public void createAndUpdatePlainAccessConfig(String addr, + PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().createPlainAccessConfig(addr, config, timeoutMillis); + } + + @Override public void deletePlainAccessConfig(String addr, + String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().deleteAccessConfig(addr, accessKey, timeoutMillis); + } + + @Override public void updateGlobalWhiteAddrConfig(String addr, + String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr, globalWhiteAddrs, timeoutMillis); + } + + @Override + public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo( + String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis); + } + @Override public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, @@ -548,6 +572,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return Collections.EMPTY_MAP; } + @Override public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 16b4427575faaa021ba97ea1828ebdab95f7bc8c..930785ec1e77337802ad080818ebca84eece2e32 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.RollbackStats; @@ -31,6 +32,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; @@ -68,6 +70,18 @@ public interface MQAdminExt extends MQAdmin { final TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void createAndUpdatePlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void deletePlainAccessConfig(final String addr, final String accessKey) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void updateGlobalWhiteAddrConfig(final String addr, final String globalWhiteAddrs)throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(final String addr) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index f2531de4ecc9278b978b3c466ae73b4316234b36..614fed820d086975cb7968650f4cfa85ad8c4b81 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -30,6 +30,10 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand; +import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand; +import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand; +import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand; import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad; import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand; import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand; @@ -199,6 +203,12 @@ public class MQAdminStartup { initCommand(new QueryConsumeQueueCommand()); initCommand(new SendMessageCommand()); initCommand(new ConsumeMessageCommand()); + + //for acl command + initCommand(new UpdateAccessConfigSubCommand()); + initCommand(new DeleteAccessConfigSubCommand()); + initCommand(new ClusterAclConfigVersionListSubCommand()); + initCommand(new UpdateGlobalWhiteAddrSubCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..c1e86fbd1ad777d63acc53427986f187efa4c6c5 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class ClusterAclConfigVersionListSubCommand implements SubCommand { + + @Override public String commandName() { + return "clusterAclConfigVersion"; + } + + @Override public String commandDesc() { + return "List all of acl config version information in cluster"; + } + + @Override public Options buildCommandlineOptions(Options options) { + OptionGroup optionGroup = new OptionGroup(); + + Option opt = new Option("b", "brokerAddr", true, "query acl config version for which broker"); + optionGroup.addOption(opt); + + opt = new Option("c", "clusterName", true, "query acl config version for specified cluster"); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + return options; + } + + @Override public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + defaultMQAdminExt.start(); + printClusterBaseInfo(defaultMQAdminExt, addr); + + System.out.printf("get broker's plain access config version success.%n", addr); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + System.out.printf("%-16s %-22s %-22s %-20s %-22s%n", + "#Cluster Name", + "#Broker Name", + "#Broker Addr", + "#AclConfigVersionNum", + "#AclLastUpdateTime" + ); + for (String addr : masterSet) { + printClusterBaseInfo(defaultMQAdminExt, addr); + } + System.out.printf("get cluster's plain access config version success.%n"); + + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private void printClusterBaseInfo( + final DefaultMQAdminExt defaultMQAdminExt, final String addr) throws + InterruptedException, MQBrokerException, RemotingException, MQClientException { + + + ClusterAclVersionInfo clusterAclVersionInfo = defaultMQAdminExt.examineBrokerClusterAclVersionInfo(addr); + DataVersion aclDataVersion = clusterAclVersionInfo.getAclConfigDataVersion(); + String versionNum = String.valueOf(aclDataVersion.getCounter()); + + DateFormat sdf = new SimpleDateFormat(UtilAll.YYYY_MM_DD_HH_MM_SS); + String timeStampStr = sdf.format(new Timestamp(aclDataVersion.getTimestamp())); + + System.out.printf("%-16s %-22s %-22s %-20s %-22s%n", + clusterAclVersionInfo.getClusterName(), + clusterAclVersionInfo.getBrokerName(), + clusterAclVersionInfo.getBrokerAddr(), + versionNum, + timeStampStr + ); + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..8570b2f7586e80b31af821e5a07313b5deb0c979 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class DeleteAccessConfigSubCommand implements SubCommand { + + @Override + public String commandName() { + return "deleteAccessConfig"; + } + + @Override + public String commandDesc() { + return "Delete Acl Config Account in broker"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + OptionGroup optionGroup = new OptionGroup(); + + Option opt = new Option("b", "brokerAddr", true, "delete acl config account to which broker"); + optionGroup.addOption(opt); + + opt = new Option("c", "clusterName", true, "delete cl config account to which cluster"); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("a", "accessKey", true, "set accessKey in acl config file for deleting which account"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + + String accessKey = commandLine.getOptionValue('a').trim(); + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey); + + System.out.printf("delete plain access config account to %s success.%n", addr); + System.out.printf("account's accesskey is:%s", accessKey); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey); + System.out.printf("delete plain access config account to %s success.%n", addr); + } + + System.out.printf("account's accesskey is:%s", accessKey); + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..10241bf91e1970afd04c46b568cf6d4e645fbb52 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class UpdateAccessConfigSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateAclConfig"; + } + + @Override + public String commandDesc() { + return "Update acl config yaml file in broker"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + OptionGroup optionGroup = new OptionGroup(); + + Option opt = new Option("b", "brokerAddr", true, "update acl config file to which broker"); + optionGroup.addOption(opt); + + opt = new Option("c", "clusterName", true, "update cl config file to which cluster"); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("a", "accessKey", true, "set accessKey in acl config file"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "secretKey", true, "set secretKey in acl config file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "defaultTopicPerm", true, "set default topicPerm in acl config file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("u", "defaultGroupPerm", true, "set default GroupPerm in acl config file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topicPerms", true, "set topicPerms list,eg: topicA=DENY,topicD=SUB"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "groupPerms", true, "set groupPerms list,eg: groupD=DENY,groupD=SUB"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "admin", true, "set admin flag in acl config file"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + PlainAccessConfig accessConfig = new PlainAccessConfig(); + accessConfig.setAccessKey(commandLine.getOptionValue('a').trim()); + // Secretkey + if (commandLine.hasOption('s')) { + accessConfig.setSecretKey(commandLine.getOptionValue('s').trim()); + } + + // Admin + if (commandLine.hasOption('m')) { + accessConfig.setAdmin(Boolean.parseBoolean(commandLine.getOptionValue('m').trim())); + } + + // DefaultTopicPerm + if (commandLine.hasOption('i')) { + accessConfig.setDefaultTopicPerm(commandLine.getOptionValue('i').trim()); + } + + // DefaultGroupPerm + if (commandLine.hasOption('u')) { + accessConfig.setDefaultGroupPerm(commandLine.getOptionValue('u').trim()); + } + + // WhiteRemoteAddress + if (commandLine.hasOption('w')) { + accessConfig.setWhiteRemoteAddress(commandLine.getOptionValue('w').trim()); + } + + // TopicPerms list value + if (commandLine.hasOption('t')) { + String[] topicPerms = commandLine.getOptionValue('t').trim().split(","); + List topicPermList = new ArrayList(); + if (topicPerms != null) { + for (String topicPerm : topicPerms) { + topicPermList.add(topicPerm); + } + } + accessConfig.setTopicPerms(topicPermList); + } + + // GroupPerms list value + if (commandLine.hasOption('g')) { + String[] groupPerms = commandLine.getOptionValue('g').trim().split(","); + List groupPermList = new ArrayList(); + if (groupPerms != null) { + for (String groupPerm : groupPerms) { + groupPermList.add(groupPerm); + } + } + accessConfig.setGroupPerms(groupPermList); + } + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig); + + System.out.printf("create or update plain access config to %s success.%n", addr); + System.out.printf("%s", accessConfig); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig); + System.out.printf("create or update plain access config to %s success.%n", addr); + } + + System.out.printf("%s", accessConfig); + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..ef9d9407ced92504947f546f4e6dd88ecf3311f8 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class UpdateGlobalWhiteAddrSubCommand implements SubCommand { + + @Override public String commandName() { + return "updateGlobalWhiteAddr"; + } + + @Override public String commandDesc() { + return "Update global white address for acl Config File in broker"; + } + + @Override public Options buildCommandlineOptions(Options options) { + + OptionGroup optionGroup = new OptionGroup(); + + Option opt = new Option("b", "brokerAddr", true, "update global white address to which broker"); + optionGroup.addOption(opt); + + opt = new Option("c", "clusterName", true, "update global white address to which cluster"); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("g", "globalWhiteRemoteAddresses", true, "set globalWhiteRemoteAddress list,eg: 10.10.103.*,192.168.0.*"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + // GlobalWhiteRemoteAddresses list value + String globalWhiteRemoteAddresses = commandLine.getOptionValue('g').trim(); + + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses); + + System.out.printf("update global white remote addresses to %s success.%n", addr); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses); + System.out.printf("update global white remote addresses to %s success.%n", addr); + } + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ba8baa3e624ee258b97bd828187b56085bd46c3e --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ClusterAclConfigVersionListSubCommandTest { + + @Test + public void testExecute() { + ClusterAclConfigVersionListSubCommand cmd = new ClusterAclConfigVersionListSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..74092f45f7933575e6e88eb8bbe955504b013b90 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DeleteAccessConfigSubCommandTest { + + @Test + public void testExecute() { + DeleteAccessConfigSubCommand cmd = new DeleteAccessConfigSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-a unit-test", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("unit-test"); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2c133a252eca2aa9cf8273f898e1545555ce91cb --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Assert; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateAccessConfigSubCommandTest { + + @Test + public void testExecute() { + UpdateAccessConfigSubCommand cmd = new UpdateAccessConfigSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-b 127.0.0.1:10911", + "-a RocketMQ", + "-s 12345678", + "-w 192.168.0.*", + "-i DENY", + "-u SUB", + "-t topicA=DENY;topicB=PUB|SUB", + "-g groupA=DENY;groupB=SUB", + "-m true"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); + assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("RocketMQ"); + assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("12345678"); + assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("192.168.0.*"); + assertThat(commandLine.getOptionValue('i').trim()).isEqualTo("DENY"); + assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("SUB"); + assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("topicA=DENY;topicB=PUB|SUB"); + assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("groupA=DENY;groupB=SUB"); + assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("true"); + + PlainAccessConfig accessConfig = new PlainAccessConfig(); + + // topicPerms list value + if (commandLine.hasOption('t')) { + String[] topicPerms = commandLine.getOptionValue('t').trim().split(";"); + List topicPermList = new ArrayList(); + if (topicPerms != null) { + for (String topicPerm : topicPerms) { + topicPermList.add(topicPerm); + } + } + accessConfig.setTopicPerms(topicPermList); + } + + // groupPerms list value + if (commandLine.hasOption('g')) { + String[] groupPerms = commandLine.getOptionValue('g').trim().split(";"); + List groupPermList = new ArrayList(); + if (groupPerms != null) { + for (String groupPerm : groupPerms) { + groupPermList.add(groupPerm); + } + } + accessConfig.setGroupPerms(groupPermList); + } + + Assert.assertTrue(accessConfig.getTopicPerms().contains("topicB=PUB|SUB")); + Assert.assertTrue(accessConfig.getGroupPerms().contains("groupB=SUB")); + + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..66d609dfd21d4529ede451efe2a6a4a3f250e77b --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.acl; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class UpdateGlobalWhiteAddrSubCommandTest { + + @Test + public void testExecute() { + UpdateGlobalWhiteAddrSubCommand cmd = new UpdateGlobalWhiteAddrSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g 10.10.103.*,192.168.0.*", "-c default-cluster"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("10.10.103.*,192.168.0.*"); + assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java index 5ea03d6584bbb1d57b716d4528ebb2b8862db844..7e7863f6dc2a7eb0f49d91a2320e9b1325a7bd5f 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java @@ -31,7 +31,6 @@ public class UpdateTopicSubCommandTest { Options options = ServerUtil.buildCommandlineOptions(new Options()); String[] subargs = new String[] { "-b 127.0.0.1:10911", - "-c default-cluster", "-t unit-test", "-r 8", "-w 8", @@ -42,7 +41,6 @@ public class UpdateTopicSubCommandTest { final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); - assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8"); assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8"); assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");