未验证 提交 a5d34dd4 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1257 from apache/enchanced_acl

[ISSUE #1156]Add new mqadmin API for ACL configuration
...@@ -17,9 +17,12 @@ ...@@ -17,9 +17,12 @@
package org.apache.rocketmq.acl; package org.apache.rocketmq.acl;
import java.util.List;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator { public interface AccessValidator {
/** /**
* Parse to get the AccessResource(user, resource, needed permission) * Parse to get the AccessResource(user, resource, needed permission)
* *
...@@ -35,4 +38,32 @@ public interface AccessValidator { ...@@ -35,4 +38,32 @@ public interface AccessValidator {
* @param accessResource * @param accessResource
*/ */
void validate(AccessResource accessResource); void validate(AccessResource accessResource);
/**
* Update the access resource config
*
* @param plainAccessConfig
* @return
*/
boolean updateAccessConfig(PlainAccessConfig plainAccessConfig);
/**
* Delete the access resource config
*
* @return
*/
boolean deleteAccessConfig(String accesskey);
/**
* Get the access resource config version information
*
* @return
*/
String getAclConfigVersion();
/**
* Update globalWhiteRemoteAddresses in acl yaml config file
* @return
*/
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;
public class AclConstants {
public static final String CONFIG_GLOBAL_WHITE_ADDRS = "globalWhiteRemoteAddresses";
public static final String CONFIG_ACCOUNTS = "accounts";
public static final String CONFIG_ACCESS_KEY = "accessKey";
public static final String CONFIG_SECRET_KEY = "secretKey";
public static final String CONFIG_WHITE_ADDR = "whiteRemoteAddress";
public static final String CONFIG_ADMIN_ROLE = "admin";
public static final String CONFIG_DEFAULT_TOPIC_PERM = "defaultTopicPerm";
public static final String CONFIG_DEFAULT_GROUP_PERM = "defaultGroupPerm";
public static final String CONFIG_TOPIC_PERMS = "topicPerms";
public static final String CONFIG_GROUP_PERMS = "groupPerms";
public static final String CONFIG_DATA_VERSION = "dataVersion";
public static final String CONFIG_COUNTER = "counter";
public static final String CONFIG_TIME_STAMP = "timestamp";
public static final int ACCESS_KEY_MIN_LENGTH = 6;
public static final int SECRET_KEY_MIN_LENGTH = 6;
}
...@@ -20,7 +20,9 @@ import com.alibaba.fastjson.JSONObject; ...@@ -20,7 +20,9 @@ import com.alibaba.fastjson.JSONObject;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map; import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -48,7 +50,7 @@ public class AclUtils { ...@@ -48,7 +50,7 @@ public class AclUtils {
return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody()); return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("incompatible exception.", e); throw new RuntimeException("Incompatible exception.", e);
} }
} }
...@@ -69,7 +71,7 @@ public class AclUtils { ...@@ -69,7 +71,7 @@ public class AclUtils {
public static void verify(String netaddress, int index) { public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) { if (!AclUtils.isScope(netaddress, index)) {
throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress)); throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
} }
} }
...@@ -127,11 +129,11 @@ public class AclUtils { ...@@ -127,11 +129,11 @@ public class AclUtils {
} }
public static <T> T getYamlDataObject(String path, Class<T> clazz) { public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml ymal = new Yaml(); Yaml yaml = new Yaml();
FileInputStream fis = null; FileInputStream fis = null;
try { try {
fis = new FileInputStream(new File(path)); fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz); return yaml.loadAs(fis, clazz);
} catch (FileNotFoundException ignore) { } catch (FileNotFoundException ignore) {
return null; return null;
} catch (Exception e) { } catch (Exception e) {
...@@ -146,13 +148,31 @@ public class AclUtils { ...@@ -146,13 +148,31 @@ public class AclUtils {
} }
} }
public static boolean writeDataObject(String path, Map<String,Object> dataMap) {
Yaml yaml = new Yaml();
PrintWriter pw = null;
try {
pw = new PrintWriter(new FileWriter(path));
String dumpAsMap = yaml.dumpAsMap(dataMap);
pw.print(dumpAsMap);
pw.flush();
} catch (Exception e) {
throw new AclException(e.getMessage());
} finally {
if (pw != null) {
pw.close();
}
}
return true;
}
public static RPCHook getAclRPCHook(String fileName) { public static RPCHook getAclRPCHook(String fileName) {
JSONObject yamlDataObject = null; JSONObject yamlDataObject = null;
try { try {
yamlDataObject = AclUtils.getYamlDataObject(fileName, yamlDataObject = AclUtils.getYamlDataObject(fileName,
JSONObject.class); JSONObject.class);
} catch (Exception e) { } catch (Exception e) {
log.error("convert yaml file to data object error, ",e); log.error("Convert yaml file to data object error, ",e);
return null; return null;
} }
...@@ -161,8 +181,8 @@ public class AclUtils { ...@@ -161,8 +181,8 @@ public class AclUtils {
return null; return null;
} }
String accessKey = yamlDataObject.getString("accessKey"); String accessKey = yamlDataObject.getString(AclConstants.CONFIG_ACCESS_KEY);
String secretKey = yamlDataObject.getString("secretKey"); String secretKey = yamlDataObject.getString(AclConstants.CONFIG_SECRET_KEY);
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) { if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
log.warn("AccessKey or secretKey is blank, the acl is not enabled."); log.warn("AccessKey or secretKey is blank, the acl is not enabled.");
......
...@@ -43,7 +43,7 @@ public class PlainAccessResource implements AccessResource { ...@@ -43,7 +43,7 @@ public class PlainAccessResource implements AccessResource {
private int requestCode; private int requestCode;
//the content to calculate the content // The content to calculate the content
private byte[] content; private byte[] content;
private String signature; private String signature;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.acl.plain; package org.apache.rocketmq.acl.plain;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
...@@ -25,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException; ...@@ -25,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
...@@ -38,10 +40,10 @@ import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic; ...@@ -38,10 +40,10 @@ import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic;
public class PlainAccessValidator implements AccessValidator { public class PlainAccessValidator implements AccessValidator {
private PlainPermissionLoader aclPlugEngine; private PlainPermissionManager aclPlugEngine;
public PlainAccessValidator() { public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionLoader(); aclPlugEngine = new PlainPermissionManager();
} }
@Override @Override
...@@ -56,8 +58,8 @@ public class PlainAccessValidator implements AccessValidator { ...@@ -56,8 +58,8 @@ public class PlainAccessValidator implements AccessValidator {
accessResource.setRequestCode(request.getCode()); accessResource.setRequestCode(request.getCode());
if (request.getExtFields() == null) { if (request.getExtFields() == null) {
//If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern) // If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern)
//The following logic codes depend on the request's extFields not to be null. // The following logic codes depend on the request's extFields not to be null.
return accessResource; return accessResource;
} }
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
...@@ -135,4 +137,22 @@ public class PlainAccessValidator implements AccessValidator { ...@@ -135,4 +137,22 @@ public class PlainAccessValidator implements AccessValidator {
aclPlugEngine.validate((PlainAccessResource) accessResource); aclPlugEngine.validate((PlainAccessResource) accessResource);
} }
@Override
public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
return aclPlugEngine.updateAccessConfig(plainAccessConfig);
}
@Override
public boolean deleteAccessConfig(String accesskey) {
return aclPlugEngine.deleteAccessConfig(accesskey);
}
@Override public String getAclConfigVersion() {
return aclPlugEngine.getAclConfigDataVersion();
}
@Override public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
}
} }
...@@ -21,27 +21,29 @@ import com.alibaba.fastjson.JSONObject; ...@@ -21,27 +21,29 @@ import com.alibaba.fastjson.JSONObject;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock; import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.srvutil.FileWatchService; import org.apache.rocketmq.srvutil.FileWatchService;
public class PlainPermissionLoader { public class PlainPermissionManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml"; private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV)); System.getenv(MixAll.ROCKETMQ_HOME_ENV));
...@@ -55,7 +57,9 @@ public class PlainPermissionLoader { ...@@ -55,7 +57,9 @@ public class PlainPermissionLoader {
private boolean isWatchStart; private boolean isWatchStart;
public PlainPermissionLoader() { private final DataVersion dataVersion = new DataVersion();
public PlainPermissionManager() {
load(); load();
watch(); watch();
} }
...@@ -67,7 +71,6 @@ public class PlainPermissionLoader { ...@@ -67,7 +71,6 @@ public class PlainPermissionLoader {
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class); JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) { if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName)); throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
} }
...@@ -80,7 +83,7 @@ public class PlainPermissionLoader { ...@@ -80,7 +83,7 @@ public class PlainPermissionLoader {
} }
} }
JSONArray accounts = plainAclConfData.getJSONArray("accounts"); JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) { if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
...@@ -89,10 +92,184 @@ public class PlainPermissionLoader { ...@@ -89,10 +92,184 @@ public class PlainPermissionLoader {
} }
} }
// For loading dataversion part just
JSONArray tempDataVersion = plainAclConfData.getJSONArray(AclConstants.CONFIG_DATA_VERSION);
if (tempDataVersion != null && !tempDataVersion.isEmpty()) {
List<DataVersion> dataVersion = tempDataVersion.toJavaList(DataVersion.class);
DataVersion firstElement = dataVersion.get(0);
this.dataVersion.assignNewOne(firstElement);
}
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap; this.plainAccessResourceMap = plainAccessResourceMap;
} }
public String getAclConfigDataVersion() {
return this.dataVersion.toJson();
}
private Map<String, Object> updateAclConfigFileVersion(Map<String, Object> updateAclConfigMap) {
dataVersion.nextVersion();
List<Map<String, Object>> versionElement = new ArrayList<Map<String, Object>>();
Map<String, Object> accountsMap = new LinkedHashMap<String, Object>() {
{
put(AclConstants.CONFIG_COUNTER, dataVersion.getCounter().longValue());
put(AclConstants.CONFIG_TIME_STAMP, dataVersion.getTimestamp());
}
};
versionElement.add(accountsMap);
updateAclConfigMap.put(AclConstants.CONFIG_DATA_VERSION, versionElement);
return updateAclConfigMap;
}
public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
if (plainAccessConfig == null) {
log.error("Parameter value plainAccessConfig is null,Please check your parameter");
return false;
}
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> updateAccountMap = null;
if (accounts != null) {
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
// Update acl access config elements
accounts.remove(account);
updateAccountMap = createAclAccessConfigMap(account, plainAccessConfig);
accounts.add(updateAccountMap);
aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true;
}
return false;
}
}
// Create acl access config elements
accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true;
}
return false;
}
log.error("Users must ensure that the acl yaml config file has accounts node element");
return false;
}
private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccoutMap, PlainAccessConfig plainAccessConfig) {
Map<String, Object> newAccountsMap = null;
if (existedAccoutMap == null) {
newAccountsMap = new LinkedHashMap<String, Object>();
} else {
newAccountsMap = existedAccoutMap;
}
if (StringUtils.isEmpty(plainAccessConfig.getAccessKey()) ||
plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH) {
throw new AclException(String.format(
"The accessKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey()));
}
newAccountsMap.put(AclConstants.CONFIG_ACCESS_KEY, plainAccessConfig.getAccessKey());
if (!StringUtils.isEmpty(plainAccessConfig.getSecretKey())) {
if (plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
throw new AclException(String.format(
"The secretKey=%s value length should longer than 6",
plainAccessConfig.getSecretKey()));
}
newAccountsMap.put(AclConstants.CONFIG_SECRET_KEY, (String) plainAccessConfig.getSecretKey());
}
if (!StringUtils.isEmpty(plainAccessConfig.getWhiteRemoteAddress())) {
newAccountsMap.put(AclConstants.CONFIG_WHITE_ADDR, plainAccessConfig.getWhiteRemoteAddress());
}
if (!StringUtils.isEmpty(String.valueOf(plainAccessConfig.isAdmin()))) {
newAccountsMap.put(AclConstants.CONFIG_ADMIN_ROLE, plainAccessConfig.isAdmin());
}
if (!StringUtils.isEmpty(plainAccessConfig.getDefaultTopicPerm())) {
newAccountsMap.put(AclConstants.CONFIG_DEFAULT_TOPIC_PERM, plainAccessConfig.getDefaultTopicPerm());
}
if (!StringUtils.isEmpty(plainAccessConfig.getDefaultGroupPerm())) {
newAccountsMap.put(AclConstants.CONFIG_DEFAULT_GROUP_PERM, plainAccessConfig.getDefaultGroupPerm());
}
if (plainAccessConfig.getTopicPerms() != null && !plainAccessConfig.getTopicPerms().isEmpty()) {
newAccountsMap.put(AclConstants.CONFIG_TOPIC_PERMS, plainAccessConfig.getTopicPerms());
}
if (plainAccessConfig.getGroupPerms() != null && !plainAccessConfig.getGroupPerms().isEmpty()) {
newAccountsMap.put(AclConstants.CONFIG_GROUP_PERMS, plainAccessConfig.getGroupPerms());
}
return newAccountsMap;
}
public boolean deleteAccessConfig(String accesskey) {
if (StringUtils.isEmpty(accesskey)) {
log.error("Parameter value accesskey is null or empty String,Please check your parameter");
return false;
}
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get("accounts");
if (accounts != null) {
Iterator<Map<String, Object>> itemIterator = accounts.iterator();
while (itemIterator.hasNext()) {
if (itemIterator.next().get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) {
// Delete the related acl config element
itemIterator.remove();
aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true;
}
return false;
}
}
}
log.error("Users must ensure that the acl yaml config file has related acl config elements");
return false;
}
public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
if (globalWhiteAddrsList == null) {
log.error("Parameter value globalWhiteAddrsList is null,Please check your parameter");
return false;
}
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
List<String> globalWhiteRemoteAddrList = (List<String>) aclAccessConfigMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
if (globalWhiteRemoteAddrList != null) {
globalWhiteRemoteAddrList.clear();
globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
// Update globalWhiteRemoteAddr element in memeory map firstly
aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS,globalWhiteRemoteAddrList);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true;
}
return false;
}
log.error("Users must ensure that the acl yaml config file has globalWhiteRemoteAddresses flag firstly");
return false;
}
private void watch() { private void watch() {
try { try {
String watchFilePath = fileHome + fileName; String watchFilePath = fileHome + fileName;
...@@ -156,8 +333,8 @@ public class PlainPermissionLoader { ...@@ -156,8 +333,8 @@ public class PlainPermissionLoader {
public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException { public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException {
if (plainAccessConfig.getAccessKey() == null if (plainAccessConfig.getAccessKey() == null
|| plainAccessConfig.getSecretKey() == null || plainAccessConfig.getSecretKey() == null
|| plainAccessConfig.getAccessKey().length() <= 6 || plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH
|| plainAccessConfig.getSecretKey().length() <= 6) { || plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
throw new AclException(String.format( throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6", "The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey())); plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
...@@ -217,89 +394,4 @@ public class PlainPermissionLoader { ...@@ -217,89 +394,4 @@ public class PlainPermissionLoader {
public boolean isWatchStart() { public boolean isWatchStart() {
return isWatchStart; return isWatchStart;
} }
static class PlainAccessConfig {
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private String defaultTopicPerm;
private String defaultGroupPerm;
private List<String> topicPerms;
private List<String> groupPerms;
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public String getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(String defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public String getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(String defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public List<String> getTopicPerms() {
return topicPerms;
}
public void setTopicPerms(List<String> topicPerms) {
this.topicPerms = topicPerms;
}
public List<String> getGroupPerms() {
return groupPerms;
}
public void setGroupPerms(List<String> groupPerms) {
this.groupPerms = groupPerms;
}
}
} }
...@@ -17,7 +17,11 @@ ...@@ -17,7 +17,11 @@
package org.apache.rocketmq.acl.common; package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -131,10 +135,77 @@ public class AclUtilsTest { ...@@ -131,10 +135,77 @@ public class AclUtilsTest {
@Test @Test
public void getYamlDataObjectTest() { public void getYamlDataObjectTest() {
Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class); Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_correct.yml", Map.class);
Assert.assertFalse(map.isEmpty()); Assert.assertFalse(map.isEmpty());
} }
@Test
public void writeDataObject2YamlFileTest() throws IOException{
String targetFileName = "src/test/resources/conf/plain_write_acl.yml";
File transport = new File(targetFileName);
transport.delete();
transport.createNewFile();
Map<String, Object> aclYamlMap = new HashMap<String, Object>();
// For globalWhiteRemoteAddrs element in acl yaml config file
List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
globalWhiteRemoteAddrs.add("10.10.103.*");
globalWhiteRemoteAddrs.add("192.168.0.*");
aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
// For accounts element in acl yaml config file
List<Map<String, Object>> accounts = new ArrayList<Map<String, Object>>();
Map<String, Object> accountsMap = new LinkedHashMap<String, Object>() {
{
put("accessKey", "RocketMQ");
put("secretKey", "12345678");
put("whiteRemoteAddress", "whiteRemoteAddress");
put("admin", "true");
}
};
accounts.add(accountsMap);
aclYamlMap.put("accounts",accounts);
Assert.assertTrue(AclUtils.writeDataObject(targetFileName, aclYamlMap));
transport.delete();
}
@Test
public void updateExistedYamlFileTest() throws IOException{
String targetFileName = "src/test/resources/conf/plain_update_acl.yml";
File transport = new File(targetFileName);
transport.delete();
transport.createNewFile();
Map<String, Object> aclYamlMap = new HashMap<String, Object>();
// For globalWhiteRemoteAddrs element in acl yaml config file
List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
globalWhiteRemoteAddrs.add("10.10.103.*");
globalWhiteRemoteAddrs.add("192.168.0.*");
aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
// Write file to yaml file
AclUtils.writeDataObject(targetFileName, aclYamlMap);
Map<String, Object> updatedMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<String> globalWhiteRemoteAddrList = (List<String>) updatedMap.get("globalWhiteRemoteAddrs");
globalWhiteRemoteAddrList.clear();
globalWhiteRemoteAddrList.add("192.168.1.2");
// Update file and flush to yaml file
AclUtils.writeDataObject(targetFileName, updatedMap);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<String> updatedGlobalWhiteRemoteAddrs = (List<String>) readableMap.get("globalWhiteRemoteAddrs");
Assert.assertEquals("192.168.1.2",updatedGlobalWhiteRemoteAddrs.get(0));
transport.delete();
}
@Test @Test
public void getYamlDataIgnoreFileNotFoundExceptionTest() { public void getYamlDataIgnoreFileNotFoundExceptionTest() {
......
...@@ -16,14 +16,20 @@ ...@@ -16,14 +16,20 @@
*/ */
package org.apache.rocketmq.acl.plain; package org.apache.rocketmq.acl.plain;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.*; import org.apache.rocketmq.common.protocol.header.*;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
...@@ -297,4 +303,262 @@ public class PlainAccessValidatorTest { ...@@ -297,4 +303,262 @@ public class PlainAccessValidatorTest {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress); PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
plainAccessValidator.validate(accessResource); plainAccessValidator.validate(accessResource);
} }
@Test
public void updateAccessAclYamlConfigNormalTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("RocketMQ");
plainAccessConfig.setSecretKey("1234567890");
plainAccessConfig.setDefaultGroupPerm("PUB");
plainAccessConfig.setDefaultTopicPerm("SUB");
List<String> topicPerms = new ArrayList<String>();
topicPerms.add("topicC=PUB|SUB");
topicPerms.add("topicB=PUB");
plainAccessConfig.setTopicPerms(topicPerms);
List<String> groupPerms = new ArrayList<String>();
groupPerms.add("groupB=PUB|SUB");
groupPerms.add("groupC=DENY");
plainAccessConfig.setGroupPerms(groupPerms);
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Update acl access yaml config file
plainAccessValidator.updateAccessConfig(plainAccessConfig);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get("accounts");
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get("accessKey").equals(plainAccessConfig.getAccessKey())) {
verifyMap = account;
break;
}
}
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"1234567890");
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"SUB");
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE),false);
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR),"192.168.0.*");
Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get("dataVersion");
Assert.assertEquals(1,dataVersions.get(0).get("counter"));
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
@Test
public void updateAccessAclYamlConfigTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("RocketMQ");
plainAccessConfig.setSecretKey("123456789111");
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Update element in the acl access yaml config file
plainAccessValidator.updateAccessConfig(plainAccessConfig);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
verifyMap = account;
break;
}
}
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
@Test
public void createAndUpdateAccessAclYamlConfigNormalTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("RocketMQ33");
plainAccessConfig.setSecretKey("123456789111");
plainAccessConfig.setDefaultGroupPerm("PUB");
plainAccessConfig.setDefaultTopicPerm("DENY");
List<String> topicPerms = new ArrayList<String>();
topicPerms.add("topicC=PUB|SUB");
topicPerms.add("topicB=PUB");
plainAccessConfig.setTopicPerms(topicPerms);
List<String> groupPerms = new ArrayList<String>();
groupPerms.add("groupB=PUB|SUB");
groupPerms.add("groupC=DENY");
plainAccessConfig.setGroupPerms(groupPerms);
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Create element in the acl access yaml config file
plainAccessValidator.updateAccessConfig(plainAccessConfig);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
verifyMap = account;
break;
}
}
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"DENY");
Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicC=PUB|SUB"));
Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicB=PUB"));
Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupB=PUB|SUB"));
Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupC=DENY"));
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
// Update element in the acl config yaml file
PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig();
plainAccessConfig2.setAccessKey("rocketmq2");
plainAccessConfig2.setSecretKey("1234567890123");
// Update acl access yaml config file secondly
plainAccessValidator.updateAccessConfig(plainAccessConfig2);
Map<String, Object> readableMap2 = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<Map<String, Object>> accounts2 = (List<Map<String, Object>>)readableMap2.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap2 = null;
for (Map<String, Object> account : accounts2) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig2.getAccessKey())) {
verifyMap2 = account;
break;
}
}
// Verify the dateversion element after updating is correct or not
List<Map<String, Object>> dataVersions2 = (List<Map<String, Object>>) readableMap2.get(AclConstants.CONFIG_DATA_VERSION);
Assert.assertEquals(2,dataVersions2.get(0).get(AclConstants.CONFIG_COUNTER));
Assert.assertEquals(verifyMap2.get(AclConstants.CONFIG_SECRET_KEY),"1234567890123");
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
@Test(expected = AclException.class)
public void updateAccessAclYamlConfigExceptionTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("RocketMQ");
plainAccessConfig.setSecretKey("12345");
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Update acl access yaml config file
plainAccessValidator.updateAccessConfig(plainAccessConfig);
}
@Test
public void deleteAccessAclYamlConfigNormalTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_delete.yml");
String targetFileName = "src/test/resources/conf/plain_acl_delete.yml";
Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
String accessKey = "rocketmq2";
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
plainAccessValidator.deleteAccessConfig(accessKey);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accessKey)) {
verifyMap = account;
break;
}
}
// Verify the specified element is removed or not
Assert.assertEquals(verifyMap,null);
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
@Test
public void updateAccessAclYamlConfigWithNoAccoutsExceptionTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_with_no_accouts.yml");
String targetFileName = "src/test/resources/conf/plain_acl_with_no_accouts.yml";
Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("RocketMQ");
plainAccessConfig.setSecretKey("1234567890");
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Update acl access yaml config file and verify the return value is true
Assert.assertEquals(plainAccessValidator.updateAccessConfig(plainAccessConfig), false);
}
@Test
public void updateGlobalWhiteAddrsNormalTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_global_white_addrs.yml");
String targetFileName = "src/test/resources/conf/plain_acl_global_white_addrs.yml";
Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Update global white remote addr value list in the acl access yaml config file
List<String> globalWhiteAddrsList = new ArrayList<String>();
globalWhiteAddrsList.add("10.10.154.1");
globalWhiteAddrsList.add("10.10.154.2");
globalWhiteAddrsList.add("10.10.154.3");
plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<String> globalWhiteAddrList = (List<String>)readableMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.1"));
Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.2"));
Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.3"));
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
} }
...@@ -26,32 +26,31 @@ import java.util.Map; ...@@ -26,32 +26,31 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.UtilAll;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class PlainPermissionLoaderTest { public class PlainPermissionManagerTest {
PlainPermissionLoader plainPermissionLoader; PlainPermissionManager plainPermissionManager;
PlainAccessResource PUBPlainAccessResource; PlainAccessResource PUBPlainAccessResource;
PlainAccessResource SUBPlainAccessResource; PlainAccessResource SUBPlainAccessResource;
PlainAccessResource ANYPlainAccessResource; PlainAccessResource ANYPlainAccessResource;
PlainAccessResource DENYPlainAccessResource; PlainAccessResource DENYPlainAccessResource;
PlainAccessResource plainAccessResource = new PlainAccessResource(); PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
Set<Integer> adminCode = new HashSet<>(); Set<Integer> adminCode = new HashSet<>();
@Before @Before
public void init() throws NoSuchFieldException, SecurityException, IOException { public void init() throws NoSuchFieldException, SecurityException, IOException {
// UPDATE_AND_CREATE_TOPIC // UPDATE_AND_CREATE_TOPIC
adminCode.add(17); adminCode.add(17);
// UPDATE_BROKER_CONFIG // UPDATE_BROKER_CONFIG
adminCode.add(25); adminCode.add(25);
// DELETE_TOPIC_IN_BROKER // DELETE_TOPIC_IN_BROKER
adminCode.add(215); adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200); adminCode.add(200);
...@@ -65,7 +64,8 @@ public class PlainPermissionLoaderTest { ...@@ -65,7 +64,8 @@ public class PlainPermissionLoaderTest {
System.setProperty("rocketmq.home.dir", "src/test/resources"); System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
plainPermissionLoader = new PlainPermissionLoader();
plainPermissionManager = new PlainPermissionManager();
} }
...@@ -95,16 +95,16 @@ public class PlainPermissionLoaderTest { ...@@ -95,16 +95,16 @@ public class PlainPermissionLoaderTest {
plainAccess.setAccessKey("RocketMQ"); plainAccess.setAccessKey("RocketMQ");
plainAccess.setSecretKey("12345678"); plainAccess.setSecretKey("12345678");
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ"); Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678"); Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
plainAccess.setWhiteRemoteAddress("127.0.0.1"); plainAccess.setWhiteRemoteAddress("127.0.0.1");
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1"); Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
plainAccess.setAdmin(true); plainAccess.setAdmin(true);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.isAdmin(), true); Assert.assertEquals(plainAccessResource.isAdmin(), true);
List<String> groups = new ArrayList<String>(); List<String> groups = new ArrayList<String>();
...@@ -112,7 +112,7 @@ public class PlainPermissionLoaderTest { ...@@ -112,7 +112,7 @@ public class PlainPermissionLoaderTest {
groups.add("groupB=PUB|SUB"); groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB"); groups.add("groupC=PUB");
plainAccess.setGroupPerms(groups); plainAccess.setGroupPerms(groups);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap(); Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3); Assert.assertEquals(resourcePermMap.size(), 3);
...@@ -125,7 +125,7 @@ public class PlainPermissionLoaderTest { ...@@ -125,7 +125,7 @@ public class PlainPermissionLoaderTest {
topics.add("topicB=PUB|SUB"); topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB"); topics.add("topicC=PUB");
plainAccess.setTopicPerms(topics); plainAccess.setTopicPerms(topics);
plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
resourcePermMap = plainAccessResource.getResourcePermMap(); resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 6); Assert.assertEquals(resourcePermMap.size(), 6);
...@@ -138,7 +138,7 @@ public class PlainPermissionLoaderTest { ...@@ -138,7 +138,7 @@ public class PlainPermissionLoaderTest {
public void checkPermAdmin() { public void checkPermAdmin() {
PlainAccessResource plainAccessResource = new PlainAccessResource(); PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRequestCode(17); plainAccessResource.setRequestCode(17);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource); plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource);
} }
@Test @Test
...@@ -146,15 +146,15 @@ public class PlainPermissionLoaderTest { ...@@ -146,15 +146,15 @@ public class PlainPermissionLoaderTest {
PlainAccessResource plainAccessResource = new PlainAccessResource(); PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB); plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource); plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB); plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource); plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);
plainAccessResource = new PlainAccessResource(); plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB); plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource); plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB); plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource); plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);
} }
@Test(expected = AclException.class) @Test(expected = AclException.class)
...@@ -162,55 +162,58 @@ public class PlainPermissionLoaderTest { ...@@ -162,55 +162,58 @@ public class PlainPermissionLoaderTest {
plainAccessResource = new PlainAccessResource(); plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.PUB); plainAccessResource.addResourceAndPerm("topicF", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource); plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
} }
@Test(expected = AclException.class) @Test(expected = AclException.class)
public void accountNullTest() { public void accountNullTest() {
plainAccessConfig.setAccessKey(null); plainAccessConfig.setAccessKey(null);
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
} }
@Test(expected = AclException.class) @Test(expected = AclException.class)
public void accountThanTest() { public void accountThanTest() {
plainAccessConfig.setAccessKey("123"); plainAccessConfig.setAccessKey("123");
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
} }
@Test(expected = AclException.class) @Test(expected = AclException.class)
public void passWordtNullTest() { public void passWordtNullTest() {
plainAccessConfig.setAccessKey(null); plainAccessConfig.setAccessKey(null);
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
} }
@Test(expected = AclException.class) @Test(expected = AclException.class)
public void passWordThanTest() { public void passWordThanTest() {
plainAccessConfig.setAccessKey("123"); plainAccessConfig.setAccessKey("123");
plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
} }
@Test(expected = AclException.class) @Test(expected = AclException.class)
public void testPlainAclPlugEngineInit() { public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", ""); System.setProperty("rocketmq.home.dir", "");
new PlainPermissionLoader().load(); new PlainPermissionManager().load();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void cleanAuthenticationInfoTest() throws IllegalAccessException { public void cleanAuthenticationInfoTest() throws IllegalAccessException {
//plainPermissionLoader.addPlainAccessResource(plainAccessResource); // PlainPermissionManager.addPlainAccessResource(plainAccessResource);
Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
Assert.assertFalse(plainAccessResourceMap.isEmpty()); Assert.assertFalse(plainAccessResourceMap.isEmpty());
plainPermissionLoader.clearPermissionInfo(); plainPermissionManager.clearPermissionInfo();
plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
Assert.assertTrue(plainAccessResourceMap.isEmpty()); Assert.assertTrue(plainAccessResourceMap.isEmpty());
// RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
} }
@Test @Test
public void isWatchStartTest() { public void isWatchStartTest() {
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader(); PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
Assert.assertTrue(plainPermissionLoader.isWatchStart()); Assert.assertTrue(plainPermissionManager.isWatchStart());
// RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
} }
...@@ -231,11 +234,12 @@ public class PlainPermissionLoaderTest { ...@@ -231,11 +234,12 @@ public class PlainPermissionLoaderTest {
writer.flush(); writer.flush();
writer.close(); writer.close();
PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
Assert.assertTrue(plainPermissionLoader.isWatchStart()); PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
Assert.assertTrue(plainPermissionManager.isWatchStart());
{ {
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq"); PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq");
Assert.assertNotNull(accessResource); Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "12345678"); Assert.assertEquals(accessResource.getSecretKey(), "12345678");
...@@ -243,17 +247,19 @@ public class PlainPermissionLoaderTest { ...@@ -243,17 +247,19 @@ public class PlainPermissionLoaderTest {
} }
writer = new FileWriter(new File(fileName), true); Map<String, Object> updatedMap = AclUtils.getYamlDataObject(fileName, Map.class);
writer.write("- accessKey: watchrocketmq1\r\n"); List<Map<String, Object>> accounts = (List<Map<String, Object>>) updatedMap.get("accounts");
writer.write(" secretKey: 88888888\r\n"); accounts.get(0).remove("accessKey");
writer.write(" whiteRemoteAddress: 127.0.0.1\r\n"); accounts.get(0).remove("secretKey");
writer.write(" admin: false\r\n"); accounts.get(0).put("accessKey", "watchrocketmq1");
writer.flush(); accounts.get(0).put("secretKey", "88888888");
writer.close(); accounts.get(0).put("admin", "false");
// Update file and flush to yaml file
AclUtils.writeDataObject(fileName, updatedMap);
Thread.sleep(1000); Thread.sleep(1000);
{ {
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1"); PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1");
Assert.assertNotNull(accessResource); Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "88888888"); Assert.assertEquals(accessResource.getSecretKey(), "88888888");
...@@ -268,8 +274,7 @@ public class PlainPermissionLoaderTest { ...@@ -268,8 +274,7 @@ public class PlainPermissionLoaderTest {
@Test(expected = AclException.class) @Test(expected = AclException.class)
public void initializeTest() { public void initializeTest() {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml"); System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
new PlainPermissionLoader(); new PlainPermissionManager();
} }
} }
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*
\ No newline at end of file
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -162,7 +163,7 @@ public class BrokerController { ...@@ -162,7 +163,7 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService; private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture; private Future<?> slaveSyncFuture;
private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
public BrokerController( public BrokerController(
final BrokerConfig brokerConfig, final BrokerConfig brokerConfig,
...@@ -502,6 +503,7 @@ public class BrokerController { ...@@ -502,6 +503,7 @@ public class BrokerController {
for (AccessValidator accessValidator: accessValidators) { for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator; final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(),validator);
this.registerServerRPCHook(new RPCHook() { this.registerServerRPCHook(new RPCHook() {
@Override @Override
...@@ -1101,7 +1103,9 @@ public class BrokerController { ...@@ -1101,7 +1103,9 @@ public class BrokerController {
} }
public Map<Class, AccessValidator> getAccessValidatorMap() {
return accessValidatorMap;
}
private void handleSlaveSynchronize(BrokerRole role) { private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) { if (role == BrokerRole.SLAVE) {
......
...@@ -30,6 +30,8 @@ import java.util.Map; ...@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
...@@ -37,6 +39,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; ...@@ -37,6 +39,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
...@@ -44,6 +47,10 @@ import org.apache.rocketmq.common.admin.OffsetWrapper; ...@@ -44,6 +47,10 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
...@@ -201,6 +208,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -201,6 +208,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return fetchAllConsumeStatsInBroker(ctx, request); return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE: case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request); return queryConsumeQueue(ctx, request);
case RequestCode.UPDATE_AND_CREATE_ACL_CONFIG:
return updateAndCreateAccessConfig(ctx, request);
case RequestCode.DELETE_ACL_CONFIG:
return deleteAccessConfig(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_INFO:
return getBrokerAclConfigVersion(ctx, request);
case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
return updateGlobalWhiteAddrsConfig(ctx, request);
default: default:
break; break;
} }
...@@ -269,6 +284,140 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -269,6 +284,140 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response; return response;
} }
private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateAccessConfigRequestHeader requestHeader =
(CreateAccessConfigRequestHeader) request.decodeCommandCustomHeader(CreateAccessConfigRequestHeader.class);
PlainAccessConfig accessConfig = new PlainAccessConfig();
accessConfig.setAccessKey(requestHeader.getAccessKey());
accessConfig.setSecretKey(requestHeader.getSecretKey());
accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress());
accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm());
accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm());
accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),","));
accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),","));
accessConfig.setAdmin(requestHeader.isAdmin());
try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
if (accessValidator.updateAccessConfig(accessConfig)) {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} else {
String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been updated failed.";
log.warn(errorMsg);
response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
response.setRemark(errorMsg);
return response;
}
} catch (Exception e) {
log.error("Failed to generate a proper update accessvalidator response", e);
response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
response.setRemark(e.getMessage());
return response;
}
return null;
}
private synchronized RemotingCommand deleteAccessConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteAccessConfigRequestHeader requestHeader =
(DeleteAccessConfigRequestHeader) request.decodeCommandCustomHeader(DeleteAccessConfigRequestHeader.class);
log.info("DeleteAccessConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
try {
String accessKey = requestHeader.getAccessKey();
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
if (accessValidator.deleteAccessConfig(accessKey)) {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} else {
String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been deleted failed.";
log.warn(errorMsg);
response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
response.setRemark(errorMsg);
return response;
}
} catch (Exception e) {
log.error("Failed to generate a proper delete accessvalidator response", e);
response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
response.setRemark(e.getMessage());
return response;
}
return null;
}
private synchronized RemotingCommand updateGlobalWhiteAddrsConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader =
(UpdateGlobalWhiteAddrsConfigRequestHeader) request.decodeCommandCustomHeader(UpdateGlobalWhiteAddrsConfigRequestHeader.class);
try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} else {
String errorMsg = "The globalWhiteAddresses[" + requestHeader.getGlobalWhiteAddrs() + "] has been updated failed.";
log.warn(errorMsg);
response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED);
response.setRemark(errorMsg);
return response;
}
} catch (Exception e) {
log.error("Failed to generate a proper update globalWhiteAddresses response", e);
response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED);
response.setRemark(e.getMessage());
return response;
}
return null;
}
private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerAclConfigResponseHeader.class);
final GetBrokerAclConfigResponseHeader responseHeader = (GetBrokerAclConfigResponseHeader)response.readCustomHeader();
try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
responseHeader.setVersion(accessValidator.getAclConfigVersion());
responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr());
responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} catch (Exception e) {
log.error("Failed to generate a proper getBrokerAclConfigVersion response", e);
}
return null;
}
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
// final GetAllTopicConfigResponseHeader responseHeader = // final GetAllTopicConfigResponseHeader responseHeader =
......
...@@ -44,8 +44,10 @@ import org.apache.rocketmq.client.log.ClientLogger; ...@@ -44,8 +44,10 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
...@@ -63,6 +65,7 @@ import org.apache.rocketmq.common.protocol.RequestCode; ...@@ -63,6 +65,7 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
...@@ -86,10 +89,13 @@ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; ...@@ -86,10 +89,13 @@ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
...@@ -123,6 +129,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; ...@@ -123,6 +129,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
...@@ -284,6 +291,104 @@ public class MQClientAPIImpl { ...@@ -284,6 +291,104 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark()); throw new MQClientException(response.getCode(), response.getRemark());
} }
public void createPlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateAccessConfigRequestHeader requestHeader = new CreateAccessConfigRequestHeader();
requestHeader.setAccessKey(plainAccessConfig.getAccessKey());
requestHeader.setSecretKey(plainAccessConfig.getSecretKey());
requestHeader.setAdmin(plainAccessConfig.isAdmin());
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
public void deleteAccessConfig(final String addr, final String accessKey, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteAccessConfigRequestHeader requestHeader = new DeleteAccessConfigRequestHeader();
requestHeader.setAccessKey(accessKey);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_ACL_CONFIG, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
requestHeader.setGlobalWhiteAddrs(globalWhiteAddrs);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetBrokerAclConfigResponseHeader responseHeader =
(GetBrokerAclConfigResponseHeader) response.decodeCommandCustomHeader(GetBrokerAclConfigResponseHeader.class);
ClusterAclVersionInfo clusterAclVersionInfo = new ClusterAclVersionInfo();
clusterAclVersionInfo.setClusterName(responseHeader.getClusterName());
clusterAclVersionInfo.setBrokerName(responseHeader.getBrokerName());
clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr());
clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class));
return clusterAclVersionInfo;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public SendResult sendMessage( public SendResult sendMessage(
final String addr, final String addr,
final String brokerName, final String brokerName,
......
...@@ -19,17 +19,24 @@ package org.apache.rocketmq.client.impl; ...@@ -19,17 +19,24 @@ package org.apache.rocketmq.client.impl;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
...@@ -210,6 +217,84 @@ public class MQClientAPIImplTest { ...@@ -210,6 +217,84 @@ public class MQClientAPIImplTest {
} }
} }
@Test
public void testCreatePlainAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createSuccessResponse4UpdateAclConfig(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
PlainAccessConfig config = createUpdateAclConfig();
try {
mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
} catch (MQClientException ex) {
}
}
@Test
public void testCreatePlainAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createErrorResponse4UpdateAclConfig(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
PlainAccessConfig config = createUpdateAclConfig();
try {
mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
} catch (MQClientException ex) {
assertThat(ex.getResponseCode()).isEqualTo(209);
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been updated failed");
}
}
@Test
public void testDeleteAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createSuccessResponse4DeleteAclConfig(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
String accessKey = "1234567";
try {
mqClientAPI.deleteAccessConfig(brokerAddr, accessKey, 3 * 1000);
} catch (MQClientException ex) {
}
}
@Test
public void testDeleteAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createErrorResponse4DeleteAclConfig(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
try {
mqClientAPI.deleteAccessConfig(brokerAddr, "11111", 3 * 1000);
} catch (MQClientException ex) {
assertThat(ex.getResponseCode()).isEqualTo(210);
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
private RemotingCommand createSuccessResponse(RemotingCommand request) { private RemotingCommand createSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
...@@ -228,6 +313,58 @@ public class MQClientAPIImplTest { ...@@ -228,6 +313,58 @@ public class MQClientAPIImplTest {
return response; return response;
} }
private RemotingCommand createSuccessResponse4UpdateAclConfig(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
return response;
}
private RemotingCommand createSuccessResponse4DeleteAclConfig(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
return response;
}
private RemotingCommand createErrorResponse4UpdateAclConfig(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark("corresponding to accessConfig has been updated failed");
return response;
}
private RemotingCommand createErrorResponse4DeleteAclConfig(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark("corresponding to accessConfig has been deleted failed");
return response;
}
private PlainAccessConfig createUpdateAclConfig() {
PlainAccessConfig config = new PlainAccessConfig();
config.setAccessKey("Rocketmq111");
config.setSecretKey("123456789");
config.setAdmin(true);
config.setWhiteRemoteAddress("127.0.0.1");
config.setDefaultTopicPerm("DENY");
config.setDefaultGroupPerm("SUB");
return config;
}
private SendMessageRequestHeader createSendMessageRequestHeader() { private SendMessageRequestHeader createSendMessageRequestHeader() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setBornTimestamp(System.currentTimeMillis());
......
...@@ -200,9 +200,6 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -200,9 +200,6 @@ public class DefaultMQConsumerWithTraceTest {
@Test @Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
//when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
//when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1]; final MessageExt[] messageExts = new MessageExt[1];
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import java.util.List;
public class PlainAccessConfig {
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private String defaultTopicPerm;
private String defaultGroupPerm;
private List<String> topicPerms;
private List<String> groupPerms;
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public String getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(String defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public String getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(String defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public List<String> getTopicPerms() {
return topicPerms;
}
public void setTopicPerms(List<String> topicPerms) {
this.topicPerms = topicPerms;
}
public List<String> getGroupPerms() {
return groupPerms;
}
public void setGroupPerms(List<String> groupPerms) {
this.groupPerms = groupPerms;
}
}
...@@ -28,15 +28,17 @@ import java.net.NetworkInterface; ...@@ -28,15 +28,17 @@ import java.net.NetworkInterface;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -529,4 +531,28 @@ public class UtilAll { ...@@ -529,4 +531,28 @@ public class UtilAll {
file.delete(); file.delete();
} }
} }
public static String List2String(List<String> list,String splitor) {
if (list == null || list.size() == 0) {
return null;
}
StringBuffer str = new StringBuffer();
for (int i = 0;i < list.size();i++) {
str.append(list.get(i));
if (i == list.size() - 1) {
continue;
}
str.append(splitor);
}
return str.toString();
}
public static List<String> String2List(String str,String splitor) {
if (StringUtils.isEmpty(str)) {
return null;
}
String[] addrArray = str.split(splitor);
return Arrays.asList(addrArray);
}
} }
...@@ -70,6 +70,14 @@ public class RequestCode { ...@@ -70,6 +70,14 @@ public class RequestCode {
public static final int CHECK_CLIENT_CONFIG = 46; public static final int CHECK_CLIENT_CONFIG = 46;
public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
public static final int DELETE_ACL_CONFIG = 51;
public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
public static final int PUT_KV_CONFIG = 100; public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101; public static final int GET_KV_CONFIG = 101;
......
...@@ -73,4 +73,11 @@ public class ResponseCode extends RemotingSysResponseCode { ...@@ -73,4 +73,11 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int CONSUME_MSG_TIMEOUT = 207; public static final int CONSUME_MSG_TIMEOUT = 207;
public static final int NO_MESSAGE = 208; public static final int NO_MESSAGE = 208;
public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
public static final int DELETE_ACL_CONFIG_FAILED = 210;
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class ClusterAclVersionInfo extends RemotingSerializable {
private String brokerName;
private String brokerAddr;
private DataVersion aclConfigDataVersion;
private String clusterName;
public String getBrokerName() {
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public String getBrokerAddr() {
return brokerAddr;
}
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public DataVersion getAclConfigDataVersion() {
return aclConfigDataVersion;
}
public void setAclConfigDataVersion(DataVersion aclConfigDataVersion) {
this.aclConfigDataVersion = aclConfigDataVersion;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class CreateAccessConfigRequestHeader implements CommandCustomHeader {
@CFNotNull
private String accessKey;
private String secretKey;
private String whiteRemoteAddress;
private boolean admin;
private String defaultTopicPerm;
private String defaultGroupPerm;
// list string,eg: topicA=DENY,topicD=SUB
private String topicPerms;
// list string,eg: groupD=DENY,groupD=SUB
private String groupPerms;
@Override public void checkFields() throws RemotingCommandException {
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getWhiteRemoteAddress() {
return whiteRemoteAddress;
}
public void setWhiteRemoteAddress(String whiteRemoteAddress) {
this.whiteRemoteAddress = whiteRemoteAddress;
}
public boolean isAdmin() {
return admin;
}
public void setAdmin(boolean admin) {
this.admin = admin;
}
public String getDefaultTopicPerm() {
return defaultTopicPerm;
}
public void setDefaultTopicPerm(String defaultTopicPerm) {
this.defaultTopicPerm = defaultTopicPerm;
}
public String getDefaultGroupPerm() {
return defaultGroupPerm;
}
public void setDefaultGroupPerm(String defaultGroupPerm) {
this.defaultGroupPerm = defaultGroupPerm;
}
public String getTopicPerms() {
return topicPerms;
}
public void setTopicPerms(String topicPerms) {
this.topicPerms = topicPerms;
}
public String getGroupPerms() {
return groupPerms;
}
public void setGroupPerms(String groupPerms) {
this.groupPerms = groupPerms;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class DeleteAccessConfigRequestHeader implements CommandCustomHeader {
@CFNotNull
private String accessKey;
@Override public void checkFields() throws RemotingCommandException {
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetBrokerAclConfigResponseHeader implements CommandCustomHeader {
@CFNotNull
private String version;
@CFNotNull
private String brokerName;
@CFNotNull
private String brokerAddr;
@CFNotNull
private String clusterName;
@Override public void checkFields() throws RemotingCommandException {
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getBrokerName() {
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public String getBrokerAddr() {
return brokerAddr;
}
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class UpdateGlobalWhiteAddrsConfigRequestHeader implements CommandCustomHeader {
@CFNotNull
private String globalWhiteAddrs;
@Override public void checkFields() throws RemotingCommandException {
}
public String getGlobalWhiteAddrs() {
return globalWhiteAddrs;
}
public void setGlobalWhiteAddrs(String globalWhiteAddrs) {
this.globalWhiteAddrs = globalWhiteAddrs;
}
}
...@@ -82,5 +82,76 @@ RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可 ...@@ -82,5 +82,76 @@ RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可
(2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组 (2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组
内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。 内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。
**特别注意**[4.5.0]版本中即使使用上面所述的白名单也无法解决开启ACL的问题,解决该问题的[PR链接](https://github.com/apache/rocketmq/pull/1149) ## 7. ACL mqadmin配置管理命令
### 7.1 更新ACL配置文件中“account”的属性值
该命令的示例如下:
sh mqadmin updateAclConfig -n 192.168.1.2:9876 -b 192.168.12.134:10911 -a RocketMQ -s 1234567809123
-t topicA=DENY,topicD=SUB -g groupD=DENY,groupB=SUB
说明:如果不存在则会在ACL Config YAML配置文件中创建;若存在,则会更新对应的“accounts”的属性值;
如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
| a | eg:RocketMQ | Access Key值(必填) |
| s | eg:1234567809123 | Secret Key值(可选) |
| m | eg:true | 是否管理员账户(可选) |
| w | eg:192.168.0.* | whiteRemoteAddress,用户IP白名单(可选) |
| i | eg:DENY;PUB;SUB;PUB\|SUB | defaultTopicPerm,默认Topic权限(可选) |
| u | eg:DENY;PUB;SUB;PUB\|SUB | defaultGroupPerm,默认ConsumerGroup权限(可选) |
| t | eg:topicA=DENY,topicD=SUB | topicPerms,各个Topic的权限(可选) |
| g | eg:groupD=DENY,groupB=SUB | groupPerms,各个ConsumerGroup的权限(可选) |
### 7.2 删除ACL配置文件里面的对应“account”
该命令的示例如下:
sh mqadmin deleteAccessConfig -n 192.168.1.2:9876 -c DefaultCluster -a RocketMQ
说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
其中,参数"a"为Access Key的值,用以标识唯一账户id,因此该命令的参数中指定账户id即可。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
| a | eg:RocketMQ | Access Key的值(必填) |
### 7.3 更新ACL配置文件里面中的全局白名单
该命令的示例如下:
sh mqadmin updateGlobalWhiteAddr -n 192.168.1.2:9876 -b 192.168.12.134:10911 -g 10.10.154.1,10.10.154.2
说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
其中,参数"g"为全局IP白名的值,用以更新ACL配置文件中的“globalWhiteRemoteAddresses”字段的属性值。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
| g | eg:10.10.154.1,10.10.154.2 | 全局IP白名单(必填) |
### 7.4 查询集群/Broker的ACL配置文件版本信息
该命令的示例如下:
sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster
说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
**特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;
\ No newline at end of file
...@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult; ...@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.RollbackStats;
...@@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable; ...@@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
...@@ -160,6 +162,27 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -160,6 +162,27 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config); defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config);
} }
@Override
public void createAndUpdatePlainAccessConfig(String addr,
PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
defaultMQAdminExtImpl.createAndUpdatePlainAccessConfig(addr, config);
}
@Override public void deletePlainAccessConfig(String addr,
String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
defaultMQAdminExtImpl.deletePlainAccessConfig(addr, accessKey);
}
@Override public void updateGlobalWhiteAddrConfig(String addr,
String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs);
}
@Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr);
}
@Override @Override
public void createAndUpdateSubscriptionGroupConfig(String addr, public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException, SubscriptionGroupConfig config) throws RemotingException,
...@@ -305,6 +328,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -305,6 +328,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
defaultMQAdminExtImpl.deleteKvConfig(namespace, key); defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
} }
@Override
public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
boolean force) boolean force)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......
...@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.impl.MQClientManager; ...@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -47,6 +48,7 @@ import org.apache.rocketmq.common.admin.RollbackStats; ...@@ -47,6 +48,7 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
...@@ -87,6 +89,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack; ...@@ -87,6 +89,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.api.TrackType; import org.apache.rocketmq.tools.admin.api.TrackType;
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQAdminExt defaultMQAdminExt; private final DefaultMQAdminExt defaultMQAdminExt;
private ServiceState serviceState = ServiceState.CREATE_JUST; private ServiceState serviceState = ServiceState.CREATE_JUST;
...@@ -178,6 +181,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -178,6 +181,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
} }
@Override public void createAndUpdatePlainAccessConfig(String addr,
PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createPlainAccessConfig(addr, config, timeoutMillis);
}
@Override public void deletePlainAccessConfig(String addr,
String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteAccessConfig(addr, accessKey, timeoutMillis);
}
@Override public void updateGlobalWhiteAddrConfig(String addr,
String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr, globalWhiteAddrs, timeoutMillis);
}
@Override
public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis);
}
@Override @Override
public void createAndUpdateSubscriptionGroupConfig(String addr, public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException, SubscriptionGroupConfig config) throws RemotingException,
...@@ -548,6 +572,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -548,6 +572,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return Collections.EMPTY_MAP; return Collections.EMPTY_MAP;
} }
@Override
public void createOrUpdateOrderConf(String key, String value, public void createOrUpdateOrderConf(String key, String value,
boolean isCluster) throws RemotingException, MQBrokerException, boolean isCluster) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException { InterruptedException, MQClientException {
......
...@@ -24,6 +24,7 @@ import java.util.Set; ...@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.RollbackStats;
...@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable; ...@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
...@@ -68,6 +70,18 @@ public interface MQAdminExt extends MQAdmin { ...@@ -68,6 +70,18 @@ public interface MQAdminExt extends MQAdmin {
final TopicConfig config) throws RemotingException, MQBrokerException, final TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException; InterruptedException, MQClientException;
void createAndUpdatePlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void deletePlainAccessConfig(final String addr, final String accessKey) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void updateGlobalWhiteAddrConfig(final String addr, final String globalWhiteAddrs)throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(final String addr) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void createAndUpdateSubscriptionGroupConfig(final String addr, void createAndUpdateSubscriptionGroupConfig(final String addr,
final SubscriptionGroupConfig config) throws RemotingException, final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException; MQBrokerException, InterruptedException, MQClientException;
......
...@@ -30,6 +30,10 @@ import org.apache.rocketmq.common.MixAll; ...@@ -30,6 +30,10 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand;
import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand;
import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand;
import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand;
import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad; import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand; import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand; import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
...@@ -199,6 +203,12 @@ public class MQAdminStartup { ...@@ -199,6 +203,12 @@ public class MQAdminStartup {
initCommand(new QueryConsumeQueueCommand()); initCommand(new QueryConsumeQueueCommand());
initCommand(new SendMessageCommand()); initCommand(new SendMessageCommand());
initCommand(new ConsumeMessageCommand()); initCommand(new ConsumeMessageCommand());
//for acl command
initCommand(new UpdateAccessConfigSubCommand());
initCommand(new DeleteAccessConfigSubCommand());
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
} }
private static void initLogback() throws JoranException { private static void initLogback() throws JoranException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ClusterAclConfigVersionListSubCommand implements SubCommand {
@Override public String commandName() {
return "clusterAclConfigVersion";
}
@Override public String commandDesc() {
return "List all of acl config version information in cluster";
}
@Override public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "query acl config version for which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "query acl config version for specified cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
printClusterBaseInfo(defaultMQAdminExt, addr);
System.out.printf("get broker's plain access config version success.%n", addr);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
System.out.printf("%-16s %-22s %-22s %-20s %-22s%n",
"#Cluster Name",
"#Broker Name",
"#Broker Addr",
"#AclConfigVersionNum",
"#AclLastUpdateTime"
);
for (String addr : masterSet) {
printClusterBaseInfo(defaultMQAdminExt, addr);
}
System.out.printf("get cluster's plain access config version success.%n");
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private void printClusterBaseInfo(
final DefaultMQAdminExt defaultMQAdminExt, final String addr) throws
InterruptedException, MQBrokerException, RemotingException, MQClientException {
ClusterAclVersionInfo clusterAclVersionInfo = defaultMQAdminExt.examineBrokerClusterAclVersionInfo(addr);
DataVersion aclDataVersion = clusterAclVersionInfo.getAclConfigDataVersion();
String versionNum = String.valueOf(aclDataVersion.getCounter());
DateFormat sdf = new SimpleDateFormat(UtilAll.YYYY_MM_DD_HH_MM_SS);
String timeStampStr = sdf.format(new Timestamp(aclDataVersion.getTimestamp()));
System.out.printf("%-16s %-22s %-22s %-20s %-22s%n",
clusterAclVersionInfo.getClusterName(),
clusterAclVersionInfo.getBrokerName(),
clusterAclVersionInfo.getBrokerAddr(),
versionNum,
timeStampStr
);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class DeleteAccessConfigSubCommand implements SubCommand {
@Override
public String commandName() {
return "deleteAccessConfig";
}
@Override
public String commandDesc() {
return "Delete Acl Config Account in broker";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "delete acl config account to which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "delete cl config account to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("a", "accessKey", true, "set accessKey in acl config file for deleting which account");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String accessKey = commandLine.getOptionValue('a').trim();
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
System.out.printf("delete plain access config account to %s success.%n", addr);
System.out.printf("account's accesskey is:%s", accessKey);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
System.out.printf("delete plain access config account to %s success.%n", addr);
}
System.out.printf("account's accesskey is:%s", accessKey);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateAccessConfigSubCommand implements SubCommand {
@Override
public String commandName() {
return "updateAclConfig";
}
@Override
public String commandDesc() {
return "Update acl config yaml file in broker";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "update acl config file to which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "update cl config file to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("a", "accessKey", true, "set accessKey in acl config file");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("i", "defaultTopicPerm", true, "set default topicPerm in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("u", "defaultGroupPerm", true, "set default GroupPerm in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topicPerms", true, "set topicPerms list,eg: topicA=DENY,topicD=SUB");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("g", "groupPerms", true, "set groupPerms list,eg: groupD=DENY,groupD=SUB");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "admin", true, "set admin flag in acl config file");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
PlainAccessConfig accessConfig = new PlainAccessConfig();
accessConfig.setAccessKey(commandLine.getOptionValue('a').trim());
// Secretkey
if (commandLine.hasOption('s')) {
accessConfig.setSecretKey(commandLine.getOptionValue('s').trim());
}
// Admin
if (commandLine.hasOption('m')) {
accessConfig.setAdmin(Boolean.parseBoolean(commandLine.getOptionValue('m').trim()));
}
// DefaultTopicPerm
if (commandLine.hasOption('i')) {
accessConfig.setDefaultTopicPerm(commandLine.getOptionValue('i').trim());
}
// DefaultGroupPerm
if (commandLine.hasOption('u')) {
accessConfig.setDefaultGroupPerm(commandLine.getOptionValue('u').trim());
}
// WhiteRemoteAddress
if (commandLine.hasOption('w')) {
accessConfig.setWhiteRemoteAddress(commandLine.getOptionValue('w').trim());
}
// TopicPerms list value
if (commandLine.hasOption('t')) {
String[] topicPerms = commandLine.getOptionValue('t').trim().split(",");
List<String> topicPermList = new ArrayList<String>();
if (topicPerms != null) {
for (String topicPerm : topicPerms) {
topicPermList.add(topicPerm);
}
}
accessConfig.setTopicPerms(topicPermList);
}
// GroupPerms list value
if (commandLine.hasOption('g')) {
String[] groupPerms = commandLine.getOptionValue('g').trim().split(",");
List<String> groupPermList = new ArrayList<String>();
if (groupPerms != null) {
for (String groupPerm : groupPerms) {
groupPermList.add(groupPerm);
}
}
accessConfig.setGroupPerms(groupPermList);
}
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
System.out.printf("create or update plain access config to %s success.%n", addr);
System.out.printf("%s", accessConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
System.out.printf("create or update plain access config to %s success.%n", addr);
}
System.out.printf("%s", accessConfig);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class UpdateGlobalWhiteAddrSubCommand implements SubCommand {
@Override public String commandName() {
return "updateGlobalWhiteAddr";
}
@Override public String commandDesc() {
return "Update global white address for acl Config File in broker";
}
@Override public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "update global white address to which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "update global white address to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("g", "globalWhiteRemoteAddresses", true, "set globalWhiteRemoteAddress list,eg: 10.10.103.*,192.168.0.*");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
// GlobalWhiteRemoteAddresses list value
String globalWhiteRemoteAddresses = commandLine.getOptionValue('g').trim();
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses);
System.out.printf("update global white remote addresses to %s success.%n", addr);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses);
System.out.printf("update global white remote addresses to %s success.%n", addr);
}
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ClusterAclConfigVersionListSubCommandTest {
@Test
public void testExecute() {
ClusterAclConfigVersionListSubCommand cmd = new ClusterAclConfigVersionListSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class DeleteAccessConfigSubCommandTest {
@Test
public void testExecute() {
DeleteAccessConfigSubCommand cmd = new DeleteAccessConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-a unit-test", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Assert;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class UpdateAccessConfigSubCommandTest {
@Test
public void testExecute() {
UpdateAccessConfigSubCommand cmd = new UpdateAccessConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {
"-b 127.0.0.1:10911",
"-a RocketMQ",
"-s 12345678",
"-w 192.168.0.*",
"-i DENY",
"-u SUB",
"-t topicA=DENY;topicB=PUB|SUB",
"-g groupA=DENY;groupB=SUB",
"-m true"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("RocketMQ");
assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("12345678");
assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("192.168.0.*");
assertThat(commandLine.getOptionValue('i').trim()).isEqualTo("DENY");
assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("SUB");
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("topicA=DENY;topicB=PUB|SUB");
assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("groupA=DENY;groupB=SUB");
assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("true");
PlainAccessConfig accessConfig = new PlainAccessConfig();
// topicPerms list value
if (commandLine.hasOption('t')) {
String[] topicPerms = commandLine.getOptionValue('t').trim().split(";");
List<String> topicPermList = new ArrayList<String>();
if (topicPerms != null) {
for (String topicPerm : topicPerms) {
topicPermList.add(topicPerm);
}
}
accessConfig.setTopicPerms(topicPermList);
}
// groupPerms list value
if (commandLine.hasOption('g')) {
String[] groupPerms = commandLine.getOptionValue('g').trim().split(";");
List<String> groupPermList = new ArrayList<String>();
if (groupPerms != null) {
for (String groupPerm : groupPerms) {
groupPermList.add(groupPerm);
}
}
accessConfig.setGroupPerms(groupPermList);
}
Assert.assertTrue(accessConfig.getTopicPerms().contains("topicB=PUB|SUB"));
Assert.assertTrue(accessConfig.getGroupPerms().contains("groupB=SUB"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class UpdateGlobalWhiteAddrSubCommandTest {
@Test
public void testExecute() {
UpdateGlobalWhiteAddrSubCommand cmd = new UpdateGlobalWhiteAddrSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g 10.10.103.*,192.168.0.*", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("10.10.103.*,192.168.0.*");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
}
}
...@@ -31,7 +31,6 @@ public class UpdateTopicSubCommandTest { ...@@ -31,7 +31,6 @@ public class UpdateTopicSubCommandTest {
Options options = ServerUtil.buildCommandlineOptions(new Options()); Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] { String[] subargs = new String[] {
"-b 127.0.0.1:10911", "-b 127.0.0.1:10911",
"-c default-cluster",
"-t unit-test", "-t unit-test",
"-r 8", "-r 8",
"-w 8", "-w 8",
...@@ -42,7 +41,6 @@ public class UpdateTopicSubCommandTest { ...@@ -42,7 +41,6 @@ public class UpdateTopicSubCommandTest {
final CommandLine commandLine = final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911"); assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8"); assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8"); assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test"); assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册