提交 f6c05db9 编写于 作者: S shenhui.backend

resolved PR 1516 merge conflicts

......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
......@@ -67,6 +67,10 @@
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
</dependency>
</dependencies>
</project>
......@@ -18,6 +18,7 @@
package org.apache.rocketmq.acl;
import java.util.List;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -66,4 +67,10 @@ public interface AccessValidator {
* @return
*/
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);
/**
* get broker cluster acl config information
* @return
*/
AclConfig getAllAclConfig();
}
......@@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
......@@ -69,24 +70,75 @@ public class AclUtils {
return signature;
}
public static void IPv6AddressCheck(String netaddress) {
if (isAsterisk(netaddress) || isMinus(netaddress)) {
int asterisk = netaddress.indexOf("*");
int minus = netaddress.indexOf("-");
// '*' must be the end of netaddress if it exists
if (asterisk > -1 && asterisk != netaddress.length() - 1) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
// format like "2::ac5:78:1-200:*" or "2::ac5:78:1-200" is legal
if (minus > -1) {
if (asterisk == -1) {
if (minus <= netaddress.lastIndexOf(":")) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
} else {
if (minus <= netaddress.lastIndexOf(":", netaddress.lastIndexOf(":") - 1)) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
}
}
}
}
public static String v6ipProcess(String netaddress, String[] strArray, int index) {
int part;
String subAddress;
boolean isAsterisk = isAsterisk(netaddress);
boolean isMinus = isMinus(netaddress);
if (isAsterisk && isMinus) {
part = 6;
int lastColon = netaddress.lastIndexOf(':');
int secondLastColon = netaddress.substring(0, lastColon).lastIndexOf(':');
subAddress = netaddress.substring(0, secondLastColon);
} else if (!isAsterisk && !isMinus) {
part = 8;
subAddress = netaddress;
} else {
part = 7;
subAddress = netaddress.substring(0, netaddress.lastIndexOf(':'));
}
return expandIP(subAddress, part);
}
public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
}
public static String[] getAddreeStrArray(String netaddress, String four) {
String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
public static String[] getAddreeStrArray(String netaddress, String partialAddress) {
String[] parAddStrArray = StringUtils.split(partialAddress.substring(1, partialAddress.length() - 1), ",");
String address = netaddress.substring(0, netaddress.indexOf("{"));
String[] addreeStrArray = new String[fourStrArray.length];
for (int i = 0; i < fourStrArray.length; i++) {
addreeStrArray[i] = address + fourStrArray[i];
String[] addreeStrArray = new String[parAddStrArray.length];
for (int i = 0; i < parAddStrArray.length; i++) {
addreeStrArray[i] = address + parAddStrArray[i];
}
return addreeStrArray;
}
public static boolean isScope(String num, int index) {
String[] strArray = StringUtils.split(num, ".");
public static boolean isScope(String netaddress, int index) {
// IPv6 Address
if (isColon(netaddress)) {
netaddress = expandIP(netaddress, 8);
String[] strArray = StringUtils.split(netaddress, ":");
return isIPv6Scope(strArray, index);
}
String[] strArray = StringUtils.split(netaddress, ".");
if (strArray.length != 4) {
return false;
}
......@@ -107,6 +159,10 @@ public class AclUtils {
}
public static boolean isColon(String netaddress) {
return netaddress.indexOf(':') > -1;
}
public static boolean isScope(String num) {
return isScope(Integer.valueOf(num.trim()));
}
......@@ -119,7 +175,7 @@ public class AclUtils {
return asterisk.indexOf('*') > -1;
}
public static boolean isColon(String colon) {
public static boolean isComma(String colon) {
return colon.indexOf(',') > -1;
}
......@@ -128,6 +184,88 @@ public class AclUtils {
}
public static boolean isIPv6Scope(String[] num, int index) {
for (int i = 0; i < index; i++) {
int value;
try {
value = Integer.parseInt(num[i], 16);
} catch (NumberFormatException e) {
return false;
}
if (!isIPv6Scope(value)) {
return false;
}
}
return true;
}
public static boolean isIPv6Scope(int num) {
int min = Integer.parseInt("0", 16);
int max = Integer.parseInt("ffff", 16);
return num >= min && num <= max;
}
public static String expandIP(String netaddress, int part) {
boolean compress = false;
int compressIndex = -1;
String[] strArray = StringUtils.split(netaddress, ":");
ArrayList<Integer> indexes = new ArrayList<>();
for (int i = 0; i < netaddress.length(); i++) {
if (netaddress.charAt(i) == ':') {
if (indexes.size() > 0 && i - indexes.get(indexes.size() - 1) == 1) {
compressIndex = i;
compress = true;
}
indexes.add(i);
}
}
for (int i = 0; i < strArray.length; i++) {
if (strArray[i].length() < 4) {
strArray[i] = "0000".substring(0, 4 - strArray[i].length()) + strArray[i];
}
}
StringBuilder sb = new StringBuilder();
if (compress) {
int pos = indexes.indexOf(compressIndex);
int index = 0;
if (!netaddress.startsWith(":")) {
for (int i = 0; i < pos; i++) {
sb.append(strArray[index]).append(":");
index += 1;
}
}
int zeroNum = part - strArray.length;
if (netaddress.endsWith(":")) {
for (int i = 0; i < zeroNum; i++) {
sb.append("0000");
if (i != zeroNum - 1) {
sb.append(":");
}
}
} else {
for (int i = 0; i < zeroNum; i++) {
sb.append("0000").append(":");
}
for (int i = index; i < strArray.length; i++) {
sb.append(strArray[i]);
if (i != strArray.length - 1) {
sb.append(":");
}
}
}
} else {
for (int i = 0; i < strArray.length; i++) {
sb.append(strArray[i]);
if (i != strArray.length - 1) {
sb.append(":");
}
}
}
return sb.toString().toUpperCase();
}
public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml yaml = new Yaml();
FileInputStream fis = null;
......@@ -148,7 +286,7 @@ public class AclUtils {
}
}
public static boolean writeDataObject(String path, Map<String,Object> dataMap) {
public static boolean writeDataObject(String path, Map<String, Object> dataMap) {
Yaml yaml = new Yaml();
PrintWriter pw = null;
try {
......@@ -172,15 +310,15 @@ public class AclUtils {
yamlDataObject = AclUtils.getYamlDataObject(fileName,
JSONObject.class);
} catch (Exception e) {
log.error("Convert yaml file to data object error, ",e);
log.error("Convert yaml file to data object error, ", e);
return null;
}
if (yamlDataObject == null || yamlDataObject.isEmpty()) {
log.warn("Cannot find conf file :{}, acl isn't be enabled." ,fileName);
log.warn("Cannot find conf file :{}, acl isn't be enabled.", fileName);
return null;
}
String accessKey = yamlDataObject.getString(AclConstants.CONFIG_ACCESS_KEY);
String secretKey = yamlDataObject.getString(AclConstants.CONFIG_SECRET_KEY);
......@@ -189,7 +327,7 @@ public class AclUtils {
return null;
}
return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
......@@ -50,7 +51,7 @@ public class PlainAccessValidator implements AccessValidator {
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
accessResource.setWhiteRemoteAddress(remoteAddr.substring(0, remoteAddr.lastIndexOf(':')));
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
......@@ -155,4 +156,7 @@ public class PlainAccessValidator implements AccessValidator {
return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
}
@Override public AclConfig getAllAclConfig() {
return aclPlugEngine.getAllAclConfig();
}
}
......@@ -30,6 +30,7 @@ import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
......@@ -270,6 +271,28 @@ public class PlainPermissionManager {
return false;
}
public AclConfig getAllAclConfig() {
AclConfig aclConfig = new AclConfig();
List<PlainAccessConfig> configs = new ArrayList<>();
List<String> whiteAddrs = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
JSONArray globalWhiteAddrs = plainAclConfData.getJSONArray(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
if (globalWhiteAddrs != null && !globalWhiteAddrs.isEmpty()) {
whiteAddrs = globalWhiteAddrs.toJavaList(String.class);
}
JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) {
configs = accounts.toJavaList(PlainAccessConfig.class);
}
aclConfig.setGlobalWhiteAddrs(whiteAddrs);
aclConfig.setPlainAccessConfigs(configs);
return aclConfig;
}
private void watch() {
try {
String watchFilePath = fileHome + fileName;
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.acl.plain;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -41,17 +42,26 @@ public class RemoteAddressStrategyFactory {
if (StringUtils.isBlank(remoteAddr)) {
return BLANK_NET_ADDRESS_STRATEGY;
}
if ("*".equals(remoteAddr) || "*.*.*.*".equals(remoteAddr)) {
if ("*".equals(remoteAddr) || "*.*.*.*".equals(remoteAddr) || "*:*:*:*:*:*:*:*".equals(remoteAddr)) {
return NULL_NET_ADDRESS_STRATEGY;
}
if (remoteAddr.endsWith("}")) {
String[] strArray = StringUtils.split(remoteAddr, ".");
String four = strArray[3];
if (!four.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
if (AclUtils.isColon(remoteAddr)) {
String[] strArray = StringUtils.split(remoteAddr, ":");
String last = strArray[strArray.length - 1];
if (!last.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
}
return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, last));
} else {
String[] strArray = StringUtils.split(remoteAddr, ".");
String four = strArray[3];
if (!four.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
}
return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, four));
}
return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, four));
} else if (AclUtils.isColon(remoteAddr)) {
} else if (AclUtils.isComma(remoteAddr)) {
return new MultipleRemoteAddressStrategy(StringUtils.split(remoteAddr, ","));
} else if (AclUtils.isAsterisk(remoteAddr) || AclUtils.isMinus(remoteAddr)) {
return new RangeRemoteAddressStrategy(remoteAddr);
......@@ -81,15 +91,26 @@ public class RemoteAddressStrategyFactory {
private final Set<String> multipleSet = new HashSet<>();
public MultipleRemoteAddressStrategy(String[] strArray) {
InetAddressValidator validator = InetAddressValidator.getInstance();
for (String netaddress : strArray) {
AclUtils.verify(netaddress, 4);
multipleSet.add(netaddress);
if (validator.isValidInet4Address(netaddress)) {
multipleSet.add(netaddress);
} else if (validator.isValidInet6Address(netaddress)) {
multipleSet.add(AclUtils.expandIP(netaddress, 8));
} else {
throw new AclException(String.format("Netaddress examine Exception netaddress is %s", netaddress));
}
}
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return multipleSet.contains(plainAccessResource.getWhiteRemoteAddress());
InetAddressValidator validator = InetAddressValidator.getInstance();
String whiteRemoteAddress = plainAccessResource.getWhiteRemoteAddress();
if (validator.isValidInet6Address(whiteRemoteAddress)) {
whiteRemoteAddress = AclUtils.expandIP(whiteRemoteAddress, 8);
}
return multipleSet.contains(whiteRemoteAddress);
}
}
......@@ -100,12 +121,16 @@ public class RemoteAddressStrategyFactory {
public OneRemoteAddressStrategy(String netaddress) {
this.netaddress = netaddress;
AclUtils.verify(netaddress, 4);
InetAddressValidator validator = InetAddressValidator.getInstance();
if (!(validator.isValidInet4Address(netaddress) || validator.isValidInet6Address(netaddress))) {
throw new AclException(String.format("Netaddress examine Exception netaddress is %s", netaddress));
}
}
@Override
public boolean match(PlainAccessResource plainAccessResource) {
return netaddress.equals(plainAccessResource.getWhiteRemoteAddress());
String writeRemoteAddress = AclUtils.expandIP(plainAccessResource.getWhiteRemoteAddress(), 8).toUpperCase();
return AclUtils.expandIP(netaddress, 8).toUpperCase().equals(writeRemoteAddress);
}
}
......@@ -121,14 +146,29 @@ public class RemoteAddressStrategyFactory {
private int index;
public RangeRemoteAddressStrategy(String remoteAddr) {
String[] strArray = StringUtils.split(remoteAddr, ".");
if (analysis(strArray, 1) || analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(remoteAddr, index - 1);
StringBuffer sb = new StringBuffer().append(strArray[0].trim()).append(".").append(strArray[1].trim()).append(".");
if (index == 3) {
sb.append(strArray[2].trim()).append(".");
// IPv6 Address
if (AclUtils.isColon(remoteAddr)) {
AclUtils.IPv6AddressCheck(remoteAddr);
String[] strArray = StringUtils.split(remoteAddr, ":");
for (int i = 1; i < strArray.length; i++) {
if (ipv6Analysis(strArray, i)) {
AclUtils.verify(remoteAddr, index - 1);
String preAddress = AclUtils.v6ipProcess(remoteAddr, strArray, index);
this.index = StringUtils.split(preAddress, ":").length;
this.head = preAddress;
break;
}
}
} else {
String[] strArray = StringUtils.split(remoteAddr, ".");
if (analysis(strArray, 1) || analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(remoteAddr, index - 1);
StringBuffer sb = new StringBuffer();
for (int j = 0; j < index; j++) {
sb.append(strArray[j].trim()).append(".");
}
this.head = sb.toString();
}
this.head = sb.toString();
}
}
......@@ -152,6 +192,27 @@ public class RemoteAddressStrategyFactory {
return this.end > 0 ? true : false;
}
private boolean ipv6Analysis(String[] strArray, int index) {
String value = strArray[index].trim();
this.index = index;
if ("*".equals(value)) {
int min = Integer.parseInt("0", 16);
int max = Integer.parseInt("ffff", 16);
setValue(min, max);
} else if (AclUtils.isMinus(value)) {
if (value.indexOf("-") == 0) {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception value %s ", value));
}
String[] valueArray = StringUtils.split(value, "-");
this.start = Integer.parseInt(valueArray[0], 16);
this.end = Integer.parseInt(valueArray[1], 16);
if (!(AclUtils.isIPv6Scope(end) && AclUtils.isIPv6Scope(start) && start <= end)) {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
}
}
return this.end > 0 ? true : false;
}
private void setValue(int start, int end) {
this.start = start;
this.end = end;
......@@ -160,21 +221,33 @@ public class RemoteAddressStrategyFactory {
@Override
public boolean match(PlainAccessResource plainAccessResource) {
String netAddress = plainAccessResource.getWhiteRemoteAddress();
if (netAddress.startsWith(this.head)) {
String value;
if (index == 3) {
value = netAddress.substring(this.head.length());
} else {
value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
InetAddressValidator validator = InetAddressValidator.getInstance();
if (validator.isValidInet4Address(netAddress)) {
if (netAddress.startsWith(this.head)) {
String value;
if (index == 3) {
value = netAddress.substring(this.head.length());
} else if (index == 2) {
value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
} else {
value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.', netAddress.lastIndexOf('.') - 1));
}
Integer address = Integer.valueOf(value);
if (address >= this.start && address <= this.end) {
return true;
}
}
Integer address = Integer.valueOf(value);
if (address >= this.start && address <= this.end) {
return true;
} else if (validator.isValidInet6Address(netAddress)) {
netAddress = AclUtils.expandIP(netAddress, 8).toUpperCase();
if (netAddress.startsWith(this.head)) {
String value = netAddress.substring(5 * index, 5 * index + 4);
Integer address = Integer.parseInt(value, 16);
if (address >= this.start && address <= this.end) {
return true;
}
}
}
return false;
}
}
}
......@@ -46,20 +46,35 @@ public class AclUtilsTest {
addressList.add("1.1.1.3");
addressList.add("1.1.1.4");
Assert.assertEquals(newAddressList, addressList);
// IPv6 test
String ipv6Address = "1:ac41:9987::bb22:666:{1,2,3,4}";
String[] ipv6AddressArray = AclUtils.getAddreeStrArray(ipv6Address, "{1,2,3,4}");
List<String> newIPv6AddressList = new ArrayList<>();
for (String a : ipv6AddressArray) {
newIPv6AddressList.add(a);
}
List<String> ipv6AddressList = new ArrayList<>();
ipv6AddressList.add("1:ac41:9987::bb22:666:1");
ipv6AddressList.add("1:ac41:9987::bb22:666:2");
ipv6AddressList.add("1:ac41:9987::bb22:666:3");
ipv6AddressList.add("1:ac41:9987::bb22:666:4");
Assert.assertEquals(newIPv6AddressList, ipv6AddressList);
}
@Test
public void isScopeStringArray() {
String adderss = "12";
String address = "12";
for (int i = 0; i < 6; i++) {
boolean isScope = AclUtils.isScope(adderss, 4);
boolean isScope = AclUtils.isScope(address, 4);
if (i == 3) {
Assert.assertTrue(isScope);
} else {
Assert.assertFalse(isScope);
}
adderss = adderss + ".12";
address = address + ".12";
}
}
......@@ -77,6 +92,25 @@ public class AclUtilsTest {
isScope = AclUtils.isScope(adderss, 3);
Assert.assertFalse(isScope);
// IPv6 test
adderss = StringUtils.split("1050:0000:0000:0000:0005:0600:300c:326b", ":");
isScope = AclUtils.isIPv6Scope(adderss, 8);
Assert.assertTrue(isScope);
isScope = AclUtils.isIPv6Scope(adderss, 4);
Assert.assertTrue(isScope);
adderss = StringUtils.split("1050:9876:0000:0000:0005:akkg:300c:326b", ":");
isScope = AclUtils.isIPv6Scope(adderss, 8);
Assert.assertFalse(isScope);
isScope = AclUtils.isIPv6Scope(adderss, 4);
Assert.assertTrue(isScope);
adderss = StringUtils.split(AclUtils.expandIP("1050::0005:akkg:300c:326b", 8), ":");
isScope = AclUtils.isIPv6Scope(adderss, 8);
Assert.assertFalse(isScope);
isScope = AclUtils.isIPv6Scope(adderss, 4);
Assert.assertTrue(isScope);
}
@Test
......@@ -102,6 +136,18 @@ public class AclUtilsTest {
isScope = AclUtils.isScope(256);
Assert.assertFalse(isScope);
// IPv6 test
int min = Integer.parseInt("0", 16);
int max = Integer.parseInt("ffff", 16);
for (int i = min; i < max + 1; i++) {
isScope = AclUtils.isIPv6Scope(i);
Assert.assertTrue(isScope);
}
isScope = AclUtils.isIPv6Scope(-1);
Assert.assertFalse(isScope);
isScope = AclUtils.isIPv6Scope(max + 1);
Assert.assertFalse(isScope);
}
@Test
......@@ -115,10 +161,10 @@ public class AclUtilsTest {
@Test
public void isColonTest() {
boolean isColon = AclUtils.isColon(",");
boolean isColon = AclUtils.isComma(",");
Assert.assertTrue(isColon);
isColon = AclUtils.isColon("-");
isColon = AclUtils.isComma("-");
Assert.assertFalse(isColon);
}
......@@ -131,6 +177,36 @@ public class AclUtilsTest {
Assert.assertFalse(isMinus);
}
@Test
public void v6ipProcessTest() {
String remoteAddr = "5::7:6:1-200:*";
String[] strArray = StringUtils.split(remoteAddr, ":");
Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0007:0006");
remoteAddr = "5::7:6:1-200";
strArray = StringUtils.split(remoteAddr, ":");
Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0000:0007:0006");
remoteAddr = "5::7:6:*";
strArray = StringUtils.split(remoteAddr, ":");
Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0000:0007:0006");
remoteAddr = "5:7:6:*";
strArray = StringUtils.split(remoteAddr, ":");
Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0007:0006");
}
@Test
public void expandIPTest() {
Assert.assertEquals(AclUtils.expandIP("::1", 8), "0000:0000:0000:0000:0000:0000:0000:0001");
Assert.assertEquals(AclUtils.expandIP("3::", 8), "0003:0000:0000:0000:0000:0000:0000:0000");
Assert.assertEquals(AclUtils.expandIP("2::2", 8), "0002:0000:0000:0000:0000:0000:0000:0002");
Assert.assertEquals(AclUtils.expandIP("4::aac4:92", 8), "0004:0000:0000:0000:0000:0000:AAC4:0092");
Assert.assertEquals(AclUtils.expandIP("ab23:56:901a::cc6:765:bb:9011", 8), "AB23:0056:901A:0000:0CC6:0765:00BB:9011");
Assert.assertEquals(AclUtils.expandIP("ab23:56:901a:1:cc6:765:bb:9011", 8), "AB23:0056:901A:0001:0CC6:0765:00BB:9011");
Assert.assertEquals(AclUtils.expandIP("5::7:6", 6), "0005:0000:0000:0000:0007:0006");
}
@SuppressWarnings("unchecked")
@Test
public void getYamlDataObjectTest() {
......@@ -140,7 +216,7 @@ public class AclUtilsTest {
}
@Test
public void writeDataObject2YamlFileTest() throws IOException{
public void writeDataObject2YamlFileTest() throws IOException {
String targetFileName = "src/test/resources/conf/plain_write_acl.yml";
File transport = new File(targetFileName);
......@@ -153,7 +229,7 @@ public class AclUtilsTest {
List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
globalWhiteRemoteAddrs.add("10.10.103.*");
globalWhiteRemoteAddrs.add("192.168.0.*");
aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
aclYamlMap.put("globalWhiteRemoteAddrs", globalWhiteRemoteAddrs);
// For accounts element in acl yaml config file
List<Map<String, Object>> accounts = new ArrayList<Map<String, Object>>();
......@@ -166,14 +242,14 @@ public class AclUtilsTest {
}
};
accounts.add(accountsMap);
aclYamlMap.put("accounts",accounts);
aclYamlMap.put("accounts", accounts);
Assert.assertTrue(AclUtils.writeDataObject(targetFileName, aclYamlMap));
transport.delete();
}
@Test
public void updateExistedYamlFileTest() throws IOException{
public void updateExistedYamlFileTest() throws IOException {
String targetFileName = "src/test/resources/conf/plain_update_acl.yml";
File transport = new File(targetFileName);
......@@ -186,7 +262,7 @@ public class AclUtilsTest {
List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
globalWhiteRemoteAddrs.add("10.10.103.*");
globalWhiteRemoteAddrs.add("192.168.0.*");
aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
aclYamlMap.put("globalWhiteRemoteAddrs", globalWhiteRemoteAddrs);
// Write file to yaml file
AclUtils.writeDataObject(targetFileName, aclYamlMap);
......@@ -201,7 +277,7 @@ public class AclUtilsTest {
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
List<String> updatedGlobalWhiteRemoteAddrs = (List<String>) readableMap.get("globalWhiteRemoteAddrs");
Assert.assertEquals("192.168.1.2",updatedGlobalWhiteRemoteAddrs.get(0));
Assert.assertEquals("192.168.1.2", updatedGlobalWhiteRemoteAddrs.get(0));
transport.delete();
}
......@@ -235,5 +311,4 @@ public class AclUtilsTest {
Assert.assertNull(incompleteContRPCHook);
}
}
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.*;
......@@ -561,4 +562,12 @@ public class PlainAccessValidatorTest {
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
@Test
public void getAllAclConfigTest(){
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
AclConfig aclConfig = plainAccessValidator.getAllAclConfig();
Assert.assertEquals(aclConfig.getGlobalWhiteAddrs().size(), 2);
Assert.assertEquals(aclConfig.getPlainAccessConfigs().size(), 2);
}
}
......@@ -39,7 +39,7 @@ public class RemoteAddressStrategyTest {
plainAccessResource.setWhiteRemoteAddress("*");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
plainAccessResource.setWhiteRemoteAddress("*.*.*.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
......@@ -71,6 +71,35 @@ public class RemoteAddressStrategyTest {
plainAccessResource.setWhiteRemoteAddress("");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
// IPv6 test
plainAccessResource.setWhiteRemoteAddress("*:*:*:*:*:*:*:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
plainAccessResource.setWhiteRemoteAddress("1050:0000:0000:0000:0005:0600:300c:326b");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.OneRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:3261,1050::0005:0600:300c:3262,1050::0005:0600:300c:3263");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:3261:{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:3261:1-200");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("1050:0005:0600:300c:3261:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:3261:1-20:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
}
@Test(expected = AclException.class)
......@@ -80,6 +109,8 @@ public class RemoteAddressStrategyTest {
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("256.0.0.1");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("::1ggg");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
@Test
......@@ -94,6 +125,7 @@ public class RemoteAddressStrategyTest {
Assert.assertFalse(isMatch);
}
@Test
public void oneNetaddressStrategyTest() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
......@@ -109,6 +141,26 @@ public class RemoteAddressStrategyTest {
plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
// Ipv6 test
plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("::1");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("::2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("::1");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("0000:0000:0000:0000:0000:0000:0000:0001");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
}
@Test
......@@ -122,6 +174,21 @@ public class RemoteAddressStrategyTest {
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleNetaddressStrategyTest(remoteAddressStrategy);
plainAccessResource.setWhiteRemoteAddress("192.100-150.*.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("192.130.0.2");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:1,1050::0005:0600:300c:2,1050::0005:0600:300c:3");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleIPv6NetaddressStrategyTest(remoteAddressStrategy);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:{1,2,3}");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
multipleIPv6NetaddressStrategyTest(remoteAddressStrategy);
}
@Test(expected = AclException.class)
......@@ -129,6 +196,8 @@ public class RemoteAddressStrategyTest {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("127.0.0.1,2,3}");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("::1,2,3}");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
private void multipleNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
......@@ -155,6 +224,30 @@ public class RemoteAddressStrategyTest {
}
private void multipleIPv6NetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("1050:0000:0000:0000:0005:0600:300c:1");
boolean match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("1050:0000:0000:0000:0005:0600:300c:2");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("1050:0000:0000:0000:0005:0600:300c:3");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertTrue(match);
plainAccessResource.setWhiteRemoteAddress("1050:0000:0000:0000:0005:0600:300c:4");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
plainAccessResource.setWhiteRemoteAddress("1050:0000:0000:0000:0005:0600:300c:0");
match = remoteAddressStrategy.match(plainAccessResource);
Assert.assertFalse(match);
}
@Test
public void rangeNetaddressStrategyTest() {
String head = "127.0.0.";
......@@ -162,6 +255,7 @@ public class RemoteAddressStrategyTest {
plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 1, 200, true);
plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 0, 255, true);
......@@ -169,14 +263,40 @@ public class RemoteAddressStrategyTest {
plainAccessResource.setWhiteRemoteAddress("127.0.1-200.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
plainAccessResource.setWhiteRemoteAddress("127.*.*.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
rangeNetaddressStrategyTest(remoteAddressStrategy, head, 0, 255, true);
plainAccessResource.setWhiteRemoteAddress("127.1-150.*.*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
// IPv6 test
head = "1050::0005:0600:300c:";
plainAccessResource = new PlainAccessResource();
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:1-200");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeIPv6NetaddressStrategyTest(remoteAddressStrategy, head, "1", "200", true);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeIPv6NetaddressStrategyTest(remoteAddressStrategy, head, "0", "ffff", true);
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:3001:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeIPv6NetaddressStrategyTest(remoteAddressStrategy, head, "0", "ffff", false);
head = "1050::0005:0600:300c:1:";
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:1-200:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeIPv6NetaddressStrategyTest(remoteAddressStrategy, head, "0", "ffff", true);
head = "1050::0005:0600:300c:201:";
plainAccessResource.setWhiteRemoteAddress("1050::0005:0600:300c:1-200:*");
remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
rangeIPv6NetaddressStrategyTest(remoteAddressStrategy, head, "0", "ffff", false);
}
private void rangeNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
......@@ -206,6 +326,25 @@ public class RemoteAddressStrategyTest {
}
}
private void rangeIPv6NetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, String start,
String end,
boolean isFalse) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
for (int i = -10; i < 65536 + 100; i++) {
String hex = Integer.toHexString(i);
plainAccessResource.setWhiteRemoteAddress(head + hex);
boolean match = remoteAddressStrategy.match(plainAccessResource);
int startNum = Integer.parseInt(start, 16);
int endNum = Integer.parseInt(end, 16);
if (isFalse && i >= startNum && i <= endNum) {
Assert.assertTrue(match);
continue;
}
Assert.assertFalse(match);
}
}
@Test(expected = AclException.class)
public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
......@@ -52,6 +53,8 @@ import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeade
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -227,6 +230,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
default:
break;
}
......@@ -429,6 +434,27 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return null;
}
private RemotingCommand getBrokerClusterAclConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);
try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
AclConfig aclConfig = accessValidator.getAllAclConfig();
body.setGlobalWhiteAddrs(aclConfig.getGlobalWhiteAddrs());
body.setPlainAccessConfigs(aclConfig.getPlainAccessConfigs());
response.setCode(ResponseCode.SUCCESS);
response.setBody(body.encode());
response.setRemark(null);
return response;
} catch (Exception e) {
log.error("Failed to generate a proper getBrokerClusterAclConfig response", e);
}
return null;
}
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
// final GetAllTopicConfigResponseHeader responseHeader =
......
......@@ -41,8 +41,6 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -52,7 +50,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -495,7 +496,21 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
for (ByteBuffer bb : messageBufferList) {
byteBuffer.put(bb);
storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
int sysFlag = bb.getInt(MessageDecoder.SYSFLAG_POSITION);
// bornhost has the IPv4 ip if the MessageSysFlag.BORNHOST_V6_FLAG bit of sysFlag is 0
// IPv4 host = ip(4 byte) + port(4 byte); IPv6 host = ip(16 byte) + port(4 byte)
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int msgStoreTimePos = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
+ 4 // 4 QUEUEID
+ 4 // 5 FLAG
+ 8 // 6 QUEUEOFFSET
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ bornhostLength; // 10 BORNHOST
storeTimestamp = bb.getLong(msgStoreTimePos);
}
} finally {
getMessageResult.release();
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.client;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
......@@ -95,7 +97,6 @@ public class ClientConfig {
}
}
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
......@@ -124,9 +125,21 @@ public class ClientConfig {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}
public Collection<MessageQueue> queuesWithNamespace(Collection<MessageQueue> queues) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queues;
}
Iterator<MessageQueue> iter = queues.iterator();
while (iter.hasNext()) {
MessageQueue queue = iter.next();
queue.setTopic(withNamespace(queue.getTopic()));
}
return queues;
}
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
......@@ -170,6 +183,7 @@ public class ClientConfig {
/**
* Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
*
* @param namesrvAddr name server address
*/
public void setNamesrvAddr(String namesrvAddr) {
......
......@@ -53,14 +53,17 @@ public class Validators {
if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
* Consumers belonging to the same consumer group share a group id. The consumers in a group then
* divides the topic as fairly amongst themselves as possible by establishing that each queue is only
* consumed by a single consumer from the group. If all consumers are from the same group, it functions
* as a traditional message queue. Each message would be consumed by one consumer of the group only.
* When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional
* publish-subscribe model. The messages are broadcast to all consumer groups.
*/
private String consumerGroup;
/**
* Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
/**
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
* The socket timeout in milliseconds
*/
private long consumerPullTimeoutMillis = 1000 * 10;
/**
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* Message queue listener
*/
private MessageQueueListener messageQueueListener;
/**
* Offset Storage
*/
private OffsetStore offsetStore;
/**
* Queue allocation algorithm
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
/**
* Whether the unit of subscription group
*/
private boolean unitMode = false;
/**
* The flag for auto commit offset
*/
private boolean autoCommit = true;
/**
* Pull thread number
*/
private int pullThreadNums = 20;
/**
* Maximum commit offset interval time in milliseconds.
*/
private long autoCommitIntervalMillis = 5 * 1000;
/**
* Maximum number of messages pulled each time.
*/
private int pullBatchSize = 10;
/**
* Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default.
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private long pullThresholdForAll = 10000;
/**
* Consume max span offset.
*/
private int consumeMaxSpan = 2000;
/**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private int pullThresholdSizeForQueue = 100;
/**
* The poll timeout in milliseconds
*/
private long pollTimeoutMillis = 1000 * 5;
/**
* Interval time in in milliseconds for checking changes in topic metadata.
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
/**
* Default constructor.
*/
public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
}
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
@Override
public void start() throws MQClientException {
this.defaultLitePullConsumerImpl.start();
}
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
}
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
@Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
@Override
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
}
@Override
public List<MessageExt> poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
}
@Override
public List<MessageExt> poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout);
}
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
}
@Override
public void pause(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
}
@Override
public void resume(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
}
@Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
}
@Override
public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
@Override
public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync();
}
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue);
}
@Override
public boolean isAutoCommit() {
return autoCommit;
}
@Override
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
public int getPullThreadNums() {
return pullThreadNums;
}
public void setPullThreadNums(int pullThreadNums) {
this.pullThreadNums = pullThreadNums;
}
public long getAutoCommitIntervalMillis() {
return autoCommitIntervalMillis;
}
public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) {
this.autoCommitIntervalMillis = autoCommitIntervalMillis;
}
public int getPullBatchSize() {
return pullBatchSize;
}
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
public long getPullThresholdForAll() {
return pullThresholdForAll;
}
public void setPullThresholdForAll(long pullThresholdForAll) {
this.pullThresholdForAll = pullThresholdForAll;
}
public int getConsumeMaxSpan() {
return consumeMaxSpan;
}
public void setConsumeMaxSpan(int consumeMaxSpan) {
this.consumeMaxSpan = consumeMaxSpan;
}
public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}
public int getPullThresholdSizeForQueue() {
return pullThresholdSizeForQueue;
}
public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
}
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
return allocateMessageQueueStrategy;
}
public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
public long getBrokerSuspendMaxTimeMillis() {
return brokerSuspendMaxTimeMillis;
}
public long getPollTimeoutMillis() {
return pollTimeoutMillis;
}
public void setPollTimeoutMillis(long pollTimeoutMillis) {
this.pollTimeoutMillis = pollTimeoutMillis;
}
public OffsetStore getOffsetStore() {
return offsetStore;
}
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
public boolean isUnitMode() {
return unitMode;
}
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
public MessageModel getMessageModel() {
return messageModel;
}
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}
public long getConsumerPullTimeoutMillis() {
return consumerPullTimeoutMillis;
}
public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
}
public long getConsumerTimeoutMillisWhenSuspend() {
return consumerTimeoutMillisWhenSuspend;
}
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
public long getTopicMetadataCheckIntervalMillis() {
return topicMetadataCheckIntervalMillis;
}
public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
}
}
......@@ -35,9 +35,13 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Default pulling consumer
* Default pulling consumer.
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public interface LitePullConsumer {
/**
* Start the consumer
*/
void start() throws MQClientException;
/**
* Shutdown the consumer
*/
void shutdown();
/**
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* Subscribe some topic with selector.
*
* @param selector message selector({@link MessageSelector}), can be null.
* @throws MQClientException if there is any client error.
*/
void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
/**
* Unsubscribe consumption some topic
*
* @param topic Message topic that needs to be unsubscribe.
*/
void unsubscribe(final String topic);
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
*
* @param messageQueues Message queues that needs to be assigned.
*/
void assign(Collection<MessageQueue> messageQueues);
/**
* Fetch data for the topics or partitions specified using assign API
*
* @return list of message, can be null.
*/
List<MessageExt> poll();
/**
* Fetch data for the topics or partitions specified using assign API
*
* @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
* negative
* @return list of message, can be null.
*/
List<MessageExt> poll(long timeout);
/**
* Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
* message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
* this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
* @param offset
*/
void seek(MessageQueue messageQueue, long offset) throws MQClientException;
/**
* Suspend pulling from the requested message queues.
*
* Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
* messages of the requested message queues drain.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
* rebalance.
*
* @param messageQueues Message queues that needs to be paused.
*/
void pause(Collection<MessageQueue> messageQueues);
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}.
*
* @param messageQueues Message queues that needs to be resumed.
*/
void resume(Collection<MessageQueue> messageQueues);
/**
* Whether to enable auto-commit consume offset.
*
* @return true if enable auto-commit, false if disable auto-commit.
*/
boolean isAutoCommit();
/**
* Set whether to enable auto-commit consume offset.
*
* @param autoCommit Whether to enable auto-commit.
*/
void setAutoCommit(boolean autoCommit);
/**
* Get metadata about the message queues for a given topic.
*
* @param topic The topic that need to get metadata.
* @return collection of message queues
* @throws MQClientException if there is any client error.
*/
Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;
/**
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
* queue.
*
* @param messageQueue Message queues that needs to get offset by timestamp.
* @param timestamp
* @return offset
* @throws MQClientException if there is any client error.
*/
Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
/**
* Manually commit consume offset.
*/
void commitSync();
/**
* Get the last committed offset for the given message queue.
*
* @param messageQueue
* @return offset, if offset equals -1 means no offset in broker.
* @throws MQClientException if there is any client error.
*/
Long committed(MessageQueue messageQueue) throws MQClientException;
/**
* Register a callback for sensing topic metadata changes.
*
* @param topic The topic that need to monitor.
* @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link
* TopicMessageQueueChangeListener}
* @throws MQClientException if there is any client error.
*/
void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
}
......@@ -169,4 +169,5 @@ public interface MQPullConsumer extends MQConsumer {
*/
void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
......@@ -32,7 +32,9 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
/**
* Schedule service for pull consumer
* Schedule service for pull consumer.
* This Consumer will be removed in 2022, and a better implementation {@link
* DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
*/
public class MQPullConsumerScheduleService {
private final InternalLogger log = ClientLogger.getLog();
......@@ -157,7 +159,7 @@ public class MQPullConsumerScheduleService {
}
}
class PullTaskImpl implements Runnable {
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
public interface TopicMessageQueueChangeListener {
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param messageQueues
*/
void onChanged(String topic, Set<MessageQueue> messageQueues);
}
......@@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
......@@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
* Update the Consumer Offset in one way, once the Master is off, updated to Slave,
* here need to be optimized.
* Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
......@@ -196,15 +194,13 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
* here need to be optimized.
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
......
......@@ -49,6 +49,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
......@@ -96,6 +97,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequest
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
......@@ -390,6 +392,30 @@ public class MQClientAPIImpl {
}
public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
return aclConfig;
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public SendResult sendMessage(
final String addr,
final String brokerName,
......
......@@ -40,11 +40,11 @@ public class MQClientManager {
return instance;
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {
return getAndCreateMQClientInstance(clientConfig, null);
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;
private RebalanceImpl rebalanceImpl;
public AssignedMessageQueue() {
assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueState>();
}
public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
this.rebalanceImpl = rebalanceImpl;
}
public Set<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}
public boolean isPaused(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.isPaused();
}
return true;
}
public void pause(Collection<MessageQueue> messageQueues) {
for (MessageQueue messageQueue : messageQueues) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.setPaused(true);
}
}
}
public void resume(Collection<MessageQueue> messageQueueCollection) {
for (MessageQueue messageQueue : messageQueueCollection) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.setPaused(false);
}
}
}
public ProcessQueue getProcessQueue(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getProcessQueue();
}
return null;
}
public long getPullOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getPullOffset();
}
return -1;
}
public void updatePullOffset(MessageQueue messageQueue, long offset) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
messageQueueState.setPullOffset(offset);
}
}
public long getConusmerOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getConsumeOffset();
}
return -1;
}
public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
messageQueueState.setConsumeOffset(offset);
}
}
public void setSeekOffset(MessageQueue messageQueue, long offset) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
messageQueueState.setSeekOffset(offset);
}
}
public long getSeekOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getSeekOffset();
}
return -1;
}
public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
}
addAssignedMessageQueue(assigned);
}
}
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
addAssignedMessageQueue(assigned);
}
}
private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueState messageQueueState;
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
messageQueueState = new MessageQueueState(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
messageQueueState = new MessageQueueState(messageQueue, processQueue);
}
this.assignedMessageQueueState.put(messageQueue, messageQueueState);
}
}
}
public void removeAssignedMessageQueue(String topic) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
it.remove();
}
}
}
}
private class MessageQueueState {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private volatile boolean paused = false;
private volatile long pullOffset = -1;
private volatile long consumeOffset = -1;
private volatile long seekOffset = -1;
private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public boolean isPaused() {
return paused;
}
public void setPaused(boolean paused) {
this.paused = paused;
}
public long getPullOffset() {
return pullOffset;
}
public void setPullOffset(long pullOffset) {
this.pullOffset = pullOffset;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public void setProcessQueue(ProcessQueue processQueue) {
this.processQueue = processQueue;
}
public long getConsumeOffset() {
return consumeOffset;
}
public void setConsumeOffset(long consumeOffset) {
this.consumeOffset = consumeOffset;
}
public long getSeekOffset() {
return seekOffset;
}
public void setSeekOffset(long seekOffset) {
this.seekOffset = seekOffset;
}
}
}
......@@ -66,6 +66,11 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumerImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
......@@ -74,7 +79,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
protected MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
......@@ -629,7 +634,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
......
......@@ -579,7 +579,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
......
......@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -431,4 +432,5 @@ public class ProcessQueue {
public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
}
......@@ -41,8 +41,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
/**
* Base class for rebalance algorithm
* This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
public RebalanceLitePullImpl(DefaultLitePullConsumerImpl litePullConsumerImpl) {
this(null, null, null, null, litePullConsumerImpl);
}
public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.litePullConsumerImpl = litePullConsumerImpl;
}
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.litePullConsumerImpl.getOffsetStore().persist(mq);
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
return true;
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
}
@Override
public void removeDirtyOffset(final MessageQueue mq) {
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
}
......@@ -246,10 +246,6 @@ public class MQClientInstance {
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
......@@ -366,7 +362,6 @@ public class MQClientInstance {
}
/**
*
* @param offsetTable
* @param namespace
* @return newOffsetTable
......@@ -385,6 +380,7 @@ public class MQClientInstance {
return newOffsetTable;
}
/**
* Remove offline broker
*/
......@@ -676,10 +672,13 @@ public class MQClientInstance {
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
......@@ -743,9 +742,10 @@ public class MQClientInstance {
return false;
}
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
* This method will be removed in the version 5.0.0,because filterServer was removed,and method
* <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*/
@Deprecated
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
......
......@@ -180,7 +180,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
......@@ -271,6 +271,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
*
* @return
*/
@Override
......@@ -464,13 +465,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* DEFAULT ASYNC -------------------------------------------------------
*/
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
......@@ -505,7 +507,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
......@@ -514,6 +515,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
private void validateNameServerSetting() throws MQClientException {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
}
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
......@@ -522,7 +532,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
......@@ -653,13 +662,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
......@@ -681,11 +686,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
......@@ -990,8 +995,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param mq
* @param sendCallback
......@@ -1105,6 +1111,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
......@@ -1117,8 +1124,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param selector
* @param arg
......@@ -1129,7 +1137,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
......@@ -1173,7 +1182,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
......
......@@ -216,7 +216,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void removeShutdownHook() {
if (shutDownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
try {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
} catch (IllegalStateException e) {
// ignore - VM is already shutting down
}
}
}
......
......@@ -54,7 +54,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQPullConsumer pullConsumer;
......
......@@ -59,8 +59,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -73,7 +75,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
......@@ -102,10 +105,12 @@ public class DefaultMQPushConsumerTest {
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar";
private String group = "FooBarGroup";
......
......@@ -66,7 +66,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
......@@ -184,6 +184,7 @@ public class DefaultMQProducerTest {
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@Test
public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
......@@ -211,12 +212,12 @@ public class DefaultMQProducerTest {
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
producer.send(new Message(),sendCallback);
producer.send(message,sendCallback,1000);
producer.send(message,new MessageQueue(),sendCallback);
producer.send(new Message(),new MessageQueue(),sendCallback,1000);
producer.send(new Message(),messageQueueSelector,null,sendCallback);
producer.send(message,messageQueueSelector,null,sendCallback,1000);
producer.send(new Message(), sendCallback);
producer.send(message, sendCallback, 1000);
producer.send(message, new MessageQueue(), sendCallback);
producer.send(new Message(), new MessageQueue(), sendCallback, 1000);
producer.send(new Message(), messageQueueSelector, null, sendCallback);
producer.send(message, messageQueueSelector, null, sendCallback, 1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(6);
......
......@@ -70,8 +70,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
......@@ -83,7 +85,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
......@@ -101,7 +104,6 @@ public class DefaultMQConsumerWithTraceTest {
private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer;
private AsyncTraceDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
......@@ -112,17 +114,16 @@ public class DefaultMQConsumerWithTraceTest {
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, "");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,"");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, false, "");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher();
asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
......@@ -131,12 +132,14 @@ public class DefaultMQConsumerWithTraceTest {
}
});
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
......
......@@ -60,7 +60,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQProducerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
......@@ -87,7 +87,7 @@ public class DefaultMQProducerWithTraceTest {
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[]{'a', 'b', 'c'});
message = new Message(topic, new byte[] {'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
asyncTraceDispatcher.getHostProducer();
......@@ -108,14 +108,13 @@ public class DefaultMQProducerWithTraceTest {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -41,5 +41,9 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import java.util.List;
public class AclConfig {
private List<String> globalWhiteAddrs;
private List<PlainAccessConfig> plainAccessConfigs;
public List<String> getGlobalWhiteAddrs() {
return globalWhiteAddrs;
}
public void setGlobalWhiteAddrs(List<String> globalWhiteAddrs) {
this.globalWhiteAddrs = globalWhiteAddrs;
}
public List<PlainAccessConfig> getPlainAccessConfigs() {
return plainAccessConfigs;
}
public void setPlainAccessConfigs(List<PlainAccessConfig> plainAccessConfigs) {
this.plainAccessConfigs = plainAccessConfigs;
}
}
......@@ -124,8 +124,10 @@ public class MixAll {
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
if (isChange) {
String[] ipAndPort = brokerAddr.split(":");
String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2);
int split = brokerAddr.lastIndexOf(":");
String ip = brokerAddr.substring(0, split);
String port = brokerAddr.substring(split + 1);
String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);
return brokerAddrNew;
} else {
return brokerAddr;
......
......@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.text.NumberFormat;
......@@ -39,6 +40,7 @@ import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -438,6 +440,18 @@ public class UtilAll {
return false;
}
public static boolean isInternalV6IP(byte[] ip) {
if (ip.length != 16) {
throw new RuntimeException("illegal ipv6 bytes");
}
//FEC0:0000:0000:0000:0000:0000:0000:0000/10
if (ip[0] == (byte) 254 && ip[1] >= (byte) 192) {
return true;
}
return false;
}
private static boolean ipCheck(byte[] ip) {
if (ip.length != 4) {
throw new RuntimeException("illegal ipv4 bytes");
......@@ -474,6 +488,15 @@ public class UtilAll {
return false;
}
private static boolean ipV6Check(byte[] ip) {
if (ip.length != 16) {
throw new RuntimeException("illegal ipv6 bytes");
}
InetAddressValidator validator = InetAddressValidator.getInstance();
return validator.isValidInet6Address(ipToIPv6Str(ip));
}
public static String ipToIPv4Str(byte[] ip) {
if (ip.length != 4) {
return null;
......@@ -483,6 +506,25 @@ public class UtilAll {
.append(".").append(ip[3] & 0xFF).toString();
}
public static String ipToIPv6Str(byte[] ip) {
if (ip.length != 16) {
return null;
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < ip.length; i++) {
String hex = Integer.toHexString(ip[i] & 0xFF);
if (hex.length() < 2) {
sb.append(0);
}
sb.append(hex);
if (i % 2 == 1 && i < ip.length - 1) {
sb.append(":");
}
}
return sb.toString();
}
public static byte[] getIP() {
try {
Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces();
......@@ -504,6 +546,17 @@ public class UtilAll {
}
}
}
} else if (ip != null && ip instanceof Inet6Address) {
byte[] ipByte = ip.getAddress();
if (ipByte.length == 16) {
if (ipV6Check(ipByte)) {
if (!isInternalV6IP(ipByte)) {
return ipByte;
} else if (internalIP == null) {
internalIP = ipByte;
}
}
}
}
}
}
......@@ -532,12 +585,12 @@ public class UtilAll {
}
}
public static String List2String(List<String> list,String splitor) {
public static String List2String(List<String> list, String splitor) {
if (list == null || list.size() == 0) {
return null;
}
StringBuffer str = new StringBuffer();
for (int i = 0;i < list.size();i++) {
for (int i = 0; i < list.size(); i++) {
str.append(list.get(i));
if (i == list.size() - 1) {
continue;
......@@ -547,7 +600,7 @@ public class UtilAll {
return str.toString();
}
public static List<String> String2List(String str,String splitor) {
public static List<String> String2List(String str, String splitor) {
if (StringUtils.isEmpty(str)) {
return null;
}
......
......@@ -31,17 +31,19 @@ public class MessageClientIDSetter {
private static long nextStartTime;
static {
LEN = 4 + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(10);
tempBuffer.position(2);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(0);
byte[] ip;
try {
tempBuffer.put(UtilAll.getIP());
ip = UtilAll.getIP();
} catch (Exception e) {
tempBuffer.put(createFakeIP());
ip = createFakeIP();
}
tempBuffer.position(6);
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.position(0);
tempBuffer.put(ip);
tempBuffer.position(ip.length);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(ip.length + 2);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
......@@ -64,11 +66,12 @@ public class MessageClientIDSetter {
public static Date getNearlyTimeFromID(String msgID) {
ByteBuffer buf = ByteBuffer.allocate(8);
byte[] bytes = UtilAll.string2bytes(msgID);
int ipLength = bytes.length == 28 ? 16 : 4;
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put(bytes, 10, 4);
buf.put(bytes, ipLength + 2 + 4, 4);
buf.position(0);
long spanMS = buf.getLong();
Calendar cal = Calendar.getInstance();
......@@ -89,13 +92,18 @@ public class MessageClientIDSetter {
public static String getIPStrFromID(String msgID) {
byte[] ipBytes = getIPFromID(msgID);
return UtilAll.ipToIPv4Str(ipBytes);
if (ipBytes.length == 16) {
return UtilAll.ipToIPv6Str(ipBytes);
} else {
return UtilAll.ipToIPv4Str(ipBytes);
}
}
public static byte[] getIPFromID(String msgID) {
byte[] result = new byte[4];
byte[] bytes = UtilAll.string2bytes(msgID);
System.arraycopy(bytes, 0, result, 0, 4);
int ipLength = bytes.length == 28 ? 16 : 4;
byte[] result = new byte[ipLength];
System.arraycopy(bytes, 0, result, 0, ipLength);
return result;
}
......
......@@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.common.message;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
......@@ -29,37 +27,41 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
public class MessageDecoder {
public final static int MSG_ID_LENGTH = 8 + 8;
// public final static int MSG_ID_LENGTH = 8 + 8;
public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
public final static int MESSAGE_FLAG_POSTION = 16;
public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
// public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
+ 4 // 4 QUEUEID
+ 4 // 5 FLAG
+ 8 // 6 QUEUEOFFSET
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ 8 // 10 BORNHOST
+ 8 // 11 STORETIMESTAMP
+ 8 // 12 STOREHOSTADDRESS
+ 4 // 13 RECONSUMETIMES
+ 8; // 14 Prepared Transaction Offset
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int SYSFLAG_POSITION = 4 + 4 + 4 + 4 + 4 + 8 + 8;
// public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
// + 4 // 2 MAGICCODE
// + 4 // 3 BODYCRC
// + 4 // 4 QUEUEID
// + 4 // 5 FLAG
// + 8 // 6 QUEUEOFFSET
// + 8 // 7 PHYSICALOFFSET
// + 4 // 8 SYSFLAG
// + 8 // 9 BORNTIMESTAMP
// + 8 // 10 BORNHOST
// + 8 // 11 STORETIMESTAMP
// + 8 // 12 STOREHOSTADDRESS
// + 4 // 13 RECONSUMETIMES
// + 8; // 14 Prepared Transaction Offset
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
input.limit(MessageDecoder.MSG_ID_LENGTH);
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);
input.put(addr);
input.putLong(offset);
......@@ -68,8 +70,9 @@ public class MessageDecoder {
}
public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
int msgIDLength = inetSocketAddress.getAddress() instanceof Inet4Address ? 16 : 28;
ByteBuffer byteBuffer = ByteBuffer.allocate(msgIDLength);
byteBuffer.put(inetSocketAddress.getAddress().getAddress());
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.putLong(transactionIdhashCode);
......@@ -80,15 +83,16 @@ public class MessageDecoder {
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
SocketAddress address;
long offset;
int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
ByteBuffer bb = ByteBuffer.wrap(port);
int portInt = bb.getInt(0);
address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
// offset
byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
bb = ByteBuffer.wrap(data);
offset = bb.getLong(0);
......@@ -101,7 +105,24 @@ public class MessageDecoder {
* @param byteBuffer msg commit log buffer.
*/
public static Map<String, String> decodeProperties(java.nio.ByteBuffer byteBuffer) {
int topicLengthPosition = BODY_SIZE_POSITION + 4 + byteBuffer.getInt(BODY_SIZE_POSITION);
int sysFlag = byteBuffer.getInt(SYSFLAG_POSITION);
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
int bodySizePosition = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
+ 4 // 4 QUEUEID
+ 4 // 5 FLAG
+ 8 // 6 QUEUEOFFSET
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ bornhostLength // 10 BORNHOST
+ 8 // 11 STORETIMESTAMP
+ storehostAddressLength // 12 STOREHOSTADDRESS
+ 4 // 13 RECONSUMETIMES
+ 8; // 14 Prepared Transaction Offset
int topicLengthPosition = bodySizePosition + 4 + byteBuffer.getInt(bodySizePosition);
byte topicLength = byteBuffer.get(topicLengthPosition);
......@@ -139,6 +160,8 @@ public class MessageDecoder {
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = messageExt.getSysFlag();
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
byte[] newBody = messageExt.getBody();
if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
newBody = UtilAll.compress(body, 5);
......@@ -158,9 +181,9 @@ public class MessageDecoder {
+ 8 // 7 PHYSICALOFFSET
+ 4 // 8 SYSFLAG
+ 8 // 9 BORNTIMESTAMP
+ 8 // 10 BORNHOST
+ bornhostLength // 10 BORNHOST
+ 8 // 11 STORETIMESTAMP
+ 8 // 12 STOREHOSTADDRESS
+ storehostAddressLength // 12 STOREHOSTADDRESS
+ 4 // 13 RECONSUMETIMES
+ 8 // 14 Prepared Transaction Offset
+ 4 + bodyLength // 14 BODY
......@@ -291,8 +314,9 @@ public class MessageDecoder {
msgExt.setBornTimestamp(bornTimeStamp);
// 10 BORNHOST
byte[] bornHost = new byte[4];
byteBuffer.get(bornHost, 0, 4);
int bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 : 16;
byte[] bornHost = new byte[bornhostIPLength];
byteBuffer.get(bornHost, 0, bornhostIPLength);
int port = byteBuffer.getInt();
msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));
......@@ -301,8 +325,9 @@ public class MessageDecoder {
msgExt.setStoreTimestamp(storeTimestamp);
// 12 STOREHOST
byte[] storeHost = new byte[4];
byteBuffer.get(storeHost, 0, 4);
int storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
byte[] storeHost = new byte[storehostIPLength];
byteBuffer.get(storeHost, 0, storehostIPLength);
port = byteBuffer.getInt();
msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));
......@@ -348,7 +373,8 @@ public class MessageDecoder {
msgExt.setProperties(map);
}
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
int msgIDLength = storehostIPLength + 4 + 8;
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
msgExt.setMsgId(msgId);
......
......@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common.message;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
......@@ -66,14 +68,26 @@ public class MessageExt extends Message {
public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
InetAddress address = inetSocketAddress.getAddress();
if (address instanceof Inet4Address) {
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
} else {
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16);
}
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.flip();
return byteBuffer;
}
public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) {
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
InetAddress address = inetSocketAddress.getAddress();
ByteBuffer byteBuffer;
if (address instanceof Inet4Address) {
byteBuffer = ByteBuffer.allocate(4 + 4);
} else {
byteBuffer = ByteBuffer.allocate(16 + 4);
}
return socketAddress2ByteBuffer(socketAddress, byteBuffer);
}
......@@ -167,6 +181,10 @@ public class MessageExt extends Message {
this.sysFlag = sysFlag;
}
public void setStoreHostAddressV6Flag() { this.sysFlag = this.sysFlag | MessageSysFlag.STOREHOSTADDRESS_V6_FLAG; }
public void setBornHostV6Flag() { this.sysFlag = this.sysFlag | MessageSysFlag.BORNHOST_V6_FLAG; }
public int getBodyCRC() {
return bodyCRC;
}
......
......@@ -78,6 +78,8 @@ public class RequestCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.List;
public class GetBrokerClusterAclConfigResponseBody extends RemotingSerializable {
private List<String> globalWhiteAddrs;
private List<PlainAccessConfig> plainAccessConfigs;
public List<String> getGlobalWhiteAddrs() {
return globalWhiteAddrs;
}
public void setGlobalWhiteAddrs(List<String> globalWhiteAddrs) {
this.globalWhiteAddrs = globalWhiteAddrs;
}
public List<PlainAccessConfig> getPlainAccessConfigs() {
return plainAccessConfigs;
}
public void setPlainAccessConfigs(List<PlainAccessConfig> plainAccessConfigs) {
this.plainAccessConfigs = plainAccessConfigs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import java.util.List;
public class GetBrokerClusterAclConfigResponseHeader implements CommandCustomHeader {
@CFNotNull
private List<PlainAccessConfig> plainAccessConfigs;
@Override
public void checkFields() throws RemotingCommandException {
}
public List<PlainAccessConfig> getPlainAccessConfigs() {
return plainAccessConfigs;
}
public void setPlainAccessConfigs(List<PlainAccessConfig> plainAccessConfigs) {
this.plainAccessConfigs = plainAccessConfigs;
}
}
......@@ -23,6 +23,8 @@ public class MessageSysFlag {
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
public final static int BORNHOST_V6_FLAG = 0x1 << 4;
public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5;
public static int getTransactionValue(final int flag) {
return flag & TRANSACTION_ROLLBACK_TYPE;
......@@ -35,4 +37,5 @@ public class MessageSysFlag {
public static int clearCompressedFlag(final int flag) {
return flag & (~COMPRESSED_FLAG);
}
}
......@@ -98,6 +98,15 @@ public class UtilAllTest {
assertThat(UtilAll.isBlank("Hello")).isFalse();
}
@Test
public void testIPv6Check() {
byte[] nonInternalIp = UtilAll.string2bytes("24084004018081003FAA1DDE2B3F898A");
byte[] internalIp = UtilAll.string2bytes("FEC0000000000000000000000000FFFF");
assertThat(UtilAll.isInternalV6IP(nonInternalIp)).isFalse();
assertThat(UtilAll.isInternalV6IP(internalIp)).isTrue();
assertThat(UtilAll.ipToIPv6Str(nonInternalIp).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
}
static class DemoConfig {
private int demoWidth = 0;
private int demoLength = 0;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message;
import java.util.Calendar;
import java.util.Date;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class MessageClientIDSetterTest {
@Test
public void testGetIPStrFromID() {
String ipv4HostMsgId = "C0A803CA00002A9F0000000000031367";
String ipv6HostMsgId = "24084004018081003FAA1DDE2B3F898A00002A9F0000000000000CA0";
String v4Ip = "192.168.3.202";
String v6Ip = "2408:4004:0180:8100:3faa:1dde:2b3f:898a";
assertThat(MessageClientIDSetter.getIPStrFromID(ipv4HostMsgId)).isEqualTo(v4Ip);
assertThat(MessageClientIDSetter.getIPStrFromID(ipv6HostMsgId)).isEqualTo(v6Ip);
}
}
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
......
......@@ -19,7 +19,7 @@
### 3. 样例
- [样例(Example)](RocketMQ_Example.md) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事消息样例等。
- [样例(Example)](RocketMQ_Example.md) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事消息样例等。
### 4. 最佳实践
......
......@@ -152,6 +152,18 @@ sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
### 7.5 查询集群/Broker的ACL配置文件全部内容
该命令的示例如下:
sh mqadmin getAccessConfigSubCommand -n 192.168.1.2:9876 -c DefaultCluster
说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
| 参数 | 取值 | 含义 |
| --- | --- | --- |
| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
**特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;
\ No newline at end of file
......@@ -26,7 +26,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
## 9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
## 10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
......
......@@ -32,7 +32,7 @@
(2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
### 2 通信机制
RocketMQ消息队列集群主要包括NameServe、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
......@@ -57,7 +57,7 @@ Header字段 | 类型 | Request说明 | Response说明
code |int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言
version | int | 请求方程序的版本 | 应答方程序的版本
opaque | int |相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回
opaque | int |相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回
flag | int | 区分是普通RPC还是onewayRPC得标志 | 区分是普通RPC还是onewayRPC得标志
remark | String | 传输自定义文本信息 | 传输自定义文本信息
extFields | HashMap<String, String> | 请求自定义扩展信息 | 响应自定义扩展信息
......@@ -95,7 +95,7 @@ M1 | NettyServerCodecThread_%d | Worker线程池
M2 | RemotingExecutorThread_%d | 业务processor处理线程池
### 3 消息过滤
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
![](image/rocketmq_design_7.png)
......
......@@ -1033,30 +1033,6 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
<td class=xl68 width=87 style='width:65pt'>是否执行jstack</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td rowspan=5 height=181 class=xl69 width=87 style='border-bottom:1.0pt
height:135.0pt;border-top:none;width:65pt'>getConsumerStatus</td>
<td rowspan=5 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>获取 Consumer 消费进度</td>
<td class=xl67 width=87 style='width:65pt'>-g</td>
<td class=xl68 width=87 style='width:65pt'>消费者所属组名</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-t</td>
<td class=xl68 width=87 style='width:65pt'>查询主题</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-i</td>
<td class=xl68 width=87 style='width:65pt'>Consumer 客户端 ip</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
<td class=xl68 width=87 style='width:65pt'>打印帮助</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td rowspan=13 height=761 class=xl69 width=87 style='border-bottom:1.0pt
height:569.0pt;border-top:none;width:65pt'>updateSubGroup</td>
......
......@@ -25,7 +25,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
```
- HTTP static server addressing(default)
After client started, it will access a http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
After client started, it will access the http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
```text
192.168.0.1:9876;192.168.0.2:9876
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -51,12 +51,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
......@@ -50,4 +50,4 @@ public class PushConsumer {
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
}
\ No newline at end of file
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.5.2</version>
<version>4.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册