diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 01ef5b60fe951dfccff97a80ebd0df8f14cf1b47..42da143f03ae170776e736beee2dc1d760762cdc 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -48,5 +48,10 @@
org.slf4j
slf4j-api
+
+ com.google.guava
+ guava
+ test
+
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index a58a3b97fc27b2e7cb93ae5aec7cdda985665c36..dd152888aa72e6e9b50df10c834d082bf430195c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -56,7 +56,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
- TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
+ TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), false);
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index f8bc55e7aab38e1f27abc75c674c28eab0a9d02b..11cfcd2dcbe40cc17191c2c9ce2ef251784c35eb 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
@@ -27,8 +28,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -50,7 +49,11 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
+import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -346,7 +349,9 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
- TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
+ boolean includeLogicalQueuesInfo = (requestHeader.getSysFlag() & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0;
+
+ TopicRouteDataNameSrv topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), includeLogicalQueuesInfo);
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
@@ -356,6 +361,16 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
topicRouteData.setOrderTopicConf(orderTopicConf);
}
+ Set logicalQueueIdsFilter = requestHeader.getLogicalQueueIdsFilter();
+ if (logicalQueueIdsFilter != null) {
+ LogicalQueuesInfoUnordered logicalQueuesInfo = topicRouteData.getLogicalQueuesInfoUnordered();
+ if (logicalQueuesInfo != null) {
+ LogicalQueuesInfoUnordered filtered = new LogicalQueuesInfoUnordered(logicalQueueIdsFilter.size());
+ logicalQueueIdsFilter.forEach(integer -> filtered.put(integer, logicalQueuesInfo.get(integer)));
+ topicRouteData.setLogicalQueuesInfoUnordered(filtered);
+ }
+ }
+
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index edef87ce2d7e5d18f00610ad2619a55357d7a5d5..476e92bc338a21466e4c32db0275e0857688a113 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -24,26 +25,32 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
@@ -54,6 +61,7 @@ public class RouteInfoManager {
private final HashMap> clusterAddrTable;
private final HashMap brokerLiveTable;
private final HashMap/* Filter Server */> filterServerTable;
+ private final ConcurrentMap logicalQueuesInfoTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap>(1024);
@@ -61,6 +69,7 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap>(32);
this.brokerLiveTable = new HashMap(256);
this.filterServerTable = new HashMap>(256);
+ this.logicalQueuesInfoTable = new ConcurrentHashMap<>(1024);
}
public byte[] getAllClusterInfo() {
@@ -148,18 +157,28 @@ public class RouteInfoManager {
|| registerFirst) {
ConcurrentMap tcTable =
topicConfigWrapper.getTopicConfigTable();
+ Map logicalQueuesInfoMap = topicConfigWrapper.getLogicalQueuesInfoMap();
if (tcTable != null) {
for (Map.Entry entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
+ if (logicalQueuesInfoMap != null) {
+ long startTime = System.nanoTime();
+ for (Map.Entry entry : logicalQueuesInfoMap.entrySet()) {
+ String topicName = entry.getKey();
+ LogicalQueuesInfoUnordered logicalQueuesInfo = ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignore -> new LogicalQueuesInfoUnordered());
+ mergeLogicalQueuesInfo(brokerName, topicName, logicalQueuesInfo, entry.getValue());
+ }
+ log.debug("mergeQueueRouteDataTable topic={} time={}ns", System.nanoTime() - startTime);
+ }
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
- topicConfigWrapper.getDataVersion(),
+ topicConfigWrapper != null ? topicConfigWrapper.getDataVersion() : new DataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
@@ -371,8 +390,12 @@ public class RouteInfoManager {
}
}
- public TopicRouteData pickupTopicRouteData(final String topic) {
- TopicRouteData topicRouteData = new TopicRouteData();
+ public TopicRouteDataNameSrv pickupTopicRouteData(final String topic) {
+ return pickupTopicRouteData(topic, false);
+ }
+
+ public TopicRouteDataNameSrv pickupTopicRouteData(final String topic, boolean includeLogicalQueuesInfo) {
+ TopicRouteDataNameSrv topicRouteData = new TopicRouteDataNameSrv();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set brokerNameSet = new HashSet();
@@ -420,6 +443,10 @@ public class RouteInfoManager {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
+ if (includeLogicalQueuesInfo) {
+ topicRouteData.setLogicalQueuesInfoUnordered(logicalQueuesInfoTable.get(topic));
+ }
+
return topicRouteData;
}
@@ -750,6 +777,34 @@ public class RouteInfoManager {
return topicList.encode();
}
+
+ private static void mergeLogicalQueuesInfo(String brokerName, String topicName,
+ LogicalQueuesInfoUnordered logicalQueuesInfoInNamesrv,
+ LogicalQueuesInfo logicalQueuesInfoFromBroker) {
+ Set newKeys = logicalQueuesInfoFromBroker.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(v -> Objects.equals(brokerName, v.getBrokerName()))
+ .map(v -> new LogicalQueuesInfoUnordered.Key(null, v.getQueueId(), v.getOffsetDelta()))
+ .collect(Collectors.toSet());
+ logicalQueuesInfoInNamesrv.values().forEach(m ->
+ m.values().removeIf(queueRouteData ->
+ Objects.equals(brokerName, queueRouteData.getBrokerName()) &&
+ !newKeys.contains(new LogicalQueuesInfoUnordered.Key(null, queueRouteData.getQueueId(), queueRouteData.getOffsetDelta()))));
+ logicalQueuesInfoFromBroker.forEach((logicalQueueId, queueRouteDataListFromBroker) -> {
+ if (logicalQueueId == null) {
+ log.warn("queueRouteDataTable topic {} contains null logicalQueueId: {}", topicName, logicalQueuesInfoFromBroker);
+ return;
+ }
+ queueRouteDataListFromBroker.stream()
+ .filter(queueRouteDataFromBroker -> Objects.equals(brokerName, queueRouteDataFromBroker.getBrokerName()))
+ .forEach(queueRouteDataFromBroker ->
+ ConcurrentHashMapUtil.computeIfAbsent(logicalQueuesInfoInNamesrv, logicalQueueId, ignored -> new ConcurrentHashMap<>(queueRouteDataListFromBroker.size()))
+ .put(new LogicalQueuesInfoUnordered.Key(brokerName, queueRouteDataFromBroker.getQueueId(), queueRouteDataFromBroker.getOffsetDelta()),
+ queueRouteDataFromBroker)
+ );
+ });
+ }
}
class BrokerLiveInfo {
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
index d4a2f66f99f9fd073a52e5a285733d981ddcedb0..ff477a3e012e381ca4a66d708b1700662a6573f8 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -16,16 +16,22 @@
*/
package org.apache.rocketmq.namesrv.processor;
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -34,14 +40,23 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
@@ -184,6 +199,98 @@ public class DefaultRequestProcessorTest {
.contains(new HashMap.SimpleEntry("broker", broker));
}
+ @Test
+ public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
+ String cluster = "cluster";
+ String broker1Name = "broker1";
+ String broker1Addr = "10.10.1.1";
+ String broker2Name = "broker2";
+ String broker2Addr = "10.10.1.2";
+ String topic = "foobar";
+
+ LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 10, 100, 100, broker1Addr);
+ {
+ RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
+ header.setBrokerName(broker1Name);
+ RemotingCommand request = RemotingCommand.createRequestCommand(
+ RequestCode.REGISTER_BROKER, header);
+ request.addExtField("brokerName", broker1Name);
+ request.addExtField("brokerAddr", broker1Addr);
+ request.addExtField("clusterName", cluster);
+ request.addExtField("haServerAddr", "10.10.2.1");
+ request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
+ topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(Collections.singletonMap(0, Lists.newArrayList(
+ queueRouteData1
+ )))));
+ topicConfigSerializeWrapper.setDataVersion(new DataVersion());
+ request.setBody(RemotingSerializable.encode(topicConfigSerializeWrapper));
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+ }
+ LogicalQueueRouteData queueRouteData2 = new LogicalQueueRouteData(0, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
+ LogicalQueueRouteData queueRouteData3 = new LogicalQueueRouteData(1, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
+ {
+ RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
+ header.setBrokerName(broker2Name);
+ RemotingCommand request = RemotingCommand.createRequestCommand(
+ RequestCode.REGISTER_BROKER, header);
+ request.addExtField("brokerName", broker2Name);
+ request.addExtField("brokerAddr", broker2Addr);
+ request.addExtField("clusterName", cluster);
+ request.addExtField("haServerAddr", "10.10.2.1");
+ request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
+ topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(ImmutableMap.of(
+ 0, Collections.singletonList(queueRouteData2),
+ 1, Collections.singletonList(queueRouteData3)
+ ))));
+ topicConfigSerializeWrapper.setDataVersion(new DataVersion());
+ request.setBody(RemotingSerializable.encode(topicConfigSerializeWrapper));
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+ }
+
+ {
+ GetRouteInfoRequestHeader header = new GetRouteInfoRequestHeader();
+ header.setTopic(topic);
+ header.setSysFlag(MessageSysFlag.LOGICAL_QUEUE_FLAG);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, header);
+ request.makeCustomHeaderToNet();
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ TopicRouteDataNameSrv topicRouteDataNameSrv = JSON.parseObject(response.getBody(), TopicRouteDataNameSrv.class);
+ assertThat(topicRouteDataNameSrv).isNotNull();
+ LogicalQueuesInfoUnordered logicalQueuesInfoUnordered = new LogicalQueuesInfoUnordered();
+ logicalQueuesInfoUnordered.put(0, ImmutableMap.of(
+ new LogicalQueuesInfoUnordered.Key(queueRouteData1.getBrokerName(), queueRouteData1.getQueueId(), queueRouteData1.getOffsetDelta()), queueRouteData1,
+ new LogicalQueuesInfoUnordered.Key(queueRouteData2.getBrokerName(), queueRouteData2.getQueueId(), queueRouteData2.getOffsetDelta()), queueRouteData2
+ ));
+ logicalQueuesInfoUnordered.put(1, ImmutableMap.of(new LogicalQueuesInfoUnordered.Key(queueRouteData3.getBrokerName(), queueRouteData3.getQueueId(), queueRouteData3.getOffsetDelta()), queueRouteData3));
+ assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
+ }
+ }
+
@Test
public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {