提交 acbc0be0 编写于 作者: C chenzlalvin

[RIP-21] submodule namesrv

上级 238f9bcc
......@@ -48,5 +48,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -56,7 +56,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), false);
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
......@@ -27,8 +28,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -50,7 +49,11 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -346,7 +349,9 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
boolean includeLogicalQueuesInfo = (requestHeader.getSysFlag() & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0;
TopicRouteDataNameSrv topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), includeLogicalQueuesInfo);
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
......@@ -356,6 +361,16 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
topicRouteData.setOrderTopicConf(orderTopicConf);
}
Set<Integer> logicalQueueIdsFilter = requestHeader.getLogicalQueueIdsFilter();
if (logicalQueueIdsFilter != null) {
LogicalQueuesInfoUnordered logicalQueuesInfo = topicRouteData.getLogicalQueuesInfoUnordered();
if (logicalQueuesInfo != null) {
LogicalQueuesInfoUnordered filtered = new LogicalQueuesInfoUnordered(logicalQueueIdsFilter.size());
logicalQueueIdsFilter.forEach(integer -> filtered.put(integer, logicalQueuesInfo.get(integer)));
topicRouteData.setLogicalQueuesInfoUnordered(filtered);
}
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -24,26 +25,32 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
......@@ -54,6 +61,7 @@ public class RouteInfoManager {
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final ConcurrentMap<String/* topic */, LogicalQueuesInfoUnordered> logicalQueuesInfoTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
......@@ -61,6 +69,7 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
this.logicalQueuesInfoTable = new ConcurrentHashMap<>(1024);
}
public byte[] getAllClusterInfo() {
......@@ -148,18 +157,28 @@ public class RouteInfoManager {
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigWrapper.getLogicalQueuesInfoMap();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
if (logicalQueuesInfoMap != null) {
long startTime = System.nanoTime();
for (Map.Entry<String, LogicalQueuesInfo> entry : logicalQueuesInfoMap.entrySet()) {
String topicName = entry.getKey();
LogicalQueuesInfoUnordered logicalQueuesInfo = ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignore -> new LogicalQueuesInfoUnordered());
mergeLogicalQueuesInfo(brokerName, topicName, logicalQueuesInfo, entry.getValue());
}
log.debug("mergeQueueRouteDataTable topic={} time={}ns", System.nanoTime() - startTime);
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
topicConfigWrapper != null ? topicConfigWrapper.getDataVersion() : new DataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
......@@ -371,8 +390,12 @@ public class RouteInfoManager {
}
}
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
public TopicRouteDataNameSrv pickupTopicRouteData(final String topic) {
return pickupTopicRouteData(topic, false);
}
public TopicRouteDataNameSrv pickupTopicRouteData(final String topic, boolean includeLogicalQueuesInfo) {
TopicRouteDataNameSrv topicRouteData = new TopicRouteDataNameSrv();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
......@@ -420,6 +443,10 @@ public class RouteInfoManager {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
if (includeLogicalQueuesInfo) {
topicRouteData.setLogicalQueuesInfoUnordered(logicalQueuesInfoTable.get(topic));
}
return topicRouteData;
}
......@@ -750,6 +777,34 @@ public class RouteInfoManager {
return topicList.encode();
}
private static void mergeLogicalQueuesInfo(String brokerName, String topicName,
LogicalQueuesInfoUnordered logicalQueuesInfoInNamesrv,
LogicalQueuesInfo logicalQueuesInfoFromBroker) {
Set<LogicalQueuesInfoUnordered.Key> newKeys = logicalQueuesInfoFromBroker.values()
.stream()
.flatMap(Collection::stream)
.filter(v -> Objects.equals(brokerName, v.getBrokerName()))
.map(v -> new LogicalQueuesInfoUnordered.Key(null, v.getQueueId(), v.getOffsetDelta()))
.collect(Collectors.toSet());
logicalQueuesInfoInNamesrv.values().forEach(m ->
m.values().removeIf(queueRouteData ->
Objects.equals(brokerName, queueRouteData.getBrokerName()) &&
!newKeys.contains(new LogicalQueuesInfoUnordered.Key(null, queueRouteData.getQueueId(), queueRouteData.getOffsetDelta()))));
logicalQueuesInfoFromBroker.forEach((logicalQueueId, queueRouteDataListFromBroker) -> {
if (logicalQueueId == null) {
log.warn("queueRouteDataTable topic {} contains null logicalQueueId: {}", topicName, logicalQueuesInfoFromBroker);
return;
}
queueRouteDataListFromBroker.stream()
.filter(queueRouteDataFromBroker -> Objects.equals(brokerName, queueRouteDataFromBroker.getBrokerName()))
.forEach(queueRouteDataFromBroker ->
ConcurrentHashMapUtil.computeIfAbsent(logicalQueuesInfoInNamesrv, logicalQueueId, ignored -> new ConcurrentHashMap<>(queueRouteDataListFromBroker.size()))
.put(new LogicalQueuesInfoUnordered.Key(brokerName, queueRouteDataFromBroker.getQueueId(), queueRouteDataFromBroker.getOffsetDelta()),
queueRouteDataFromBroker)
);
});
}
}
class BrokerLiveInfo {
......
......@@ -16,16 +16,22 @@
*/
package org.apache.rocketmq.namesrv.processor;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -34,14 +40,23 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
......@@ -184,6 +199,98 @@ public class DefaultRequestProcessorTest {
.contains(new HashMap.SimpleEntry("broker", broker));
}
@Test
public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
String cluster = "cluster";
String broker1Name = "broker1";
String broker1Addr = "10.10.1.1";
String broker2Name = "broker2";
String broker2Addr = "10.10.1.2";
String topic = "foobar";
LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 10, 100, 100, broker1Addr);
{
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
header.setBrokerName(broker1Name);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.REGISTER_BROKER, header);
request.addExtField("brokerName", broker1Name);
request.addExtField("brokerAddr", broker1Addr);
request.addExtField("clusterName", cluster);
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(Collections.singletonMap(0, Lists.newArrayList(
queueRouteData1
)))));
topicConfigSerializeWrapper.setDataVersion(new DataVersion());
request.setBody(RemotingSerializable.encode(topicConfigSerializeWrapper));
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
}
LogicalQueueRouteData queueRouteData2 = new LogicalQueueRouteData(0, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
LogicalQueueRouteData queueRouteData3 = new LogicalQueueRouteData(1, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
{
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
header.setBrokerName(broker2Name);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.REGISTER_BROKER, header);
request.addExtField("brokerName", broker2Name);
request.addExtField("brokerAddr", broker2Addr);
request.addExtField("clusterName", cluster);
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(ImmutableMap.of(
0, Collections.singletonList(queueRouteData2),
1, Collections.singletonList(queueRouteData3)
))));
topicConfigSerializeWrapper.setDataVersion(new DataVersion());
request.setBody(RemotingSerializable.encode(topicConfigSerializeWrapper));
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
}
{
GetRouteInfoRequestHeader header = new GetRouteInfoRequestHeader();
header.setTopic(topic);
header.setSysFlag(MessageSysFlag.LOGICAL_QUEUE_FLAG);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, header);
request.makeCustomHeaderToNet();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
TopicRouteDataNameSrv topicRouteDataNameSrv = JSON.parseObject(response.getBody(), TopicRouteDataNameSrv.class);
assertThat(topicRouteDataNameSrv).isNotNull();
LogicalQueuesInfoUnordered logicalQueuesInfoUnordered = new LogicalQueuesInfoUnordered();
logicalQueuesInfoUnordered.put(0, ImmutableMap.of(
new LogicalQueuesInfoUnordered.Key(queueRouteData1.getBrokerName(), queueRouteData1.getQueueId(), queueRouteData1.getOffsetDelta()), queueRouteData1,
new LogicalQueuesInfoUnordered.Key(queueRouteData2.getBrokerName(), queueRouteData2.getQueueId(), queueRouteData2.getOffsetDelta()), queueRouteData2
));
logicalQueuesInfoUnordered.put(1, ImmutableMap.of(new LogicalQueuesInfoUnordered.Key(queueRouteData3.getBrokerName(), queueRouteData3.getQueueId(), queueRouteData3.getOffsetDelta()), queueRouteData3));
assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
}
}
@Test
public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册