提交 7707275e 编写于 作者: D dongeforever

Finish the register process in namesrv

上级 7bbc7ddf
......@@ -1084,7 +1084,7 @@ public class BrokerController {
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
.map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.clone4register()))
.map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.cloneAsMappingInfo()))
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
......@@ -1103,7 +1103,7 @@ public class BrokerController {
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().clone4register())
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().cloneAsMappingInfo())
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
......
......@@ -1726,11 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
TopicQueueMappingInfo topicQueueMappingInfo = null;
TopicQueueMappingDetail topicQueueMappingDetail = null;
if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
topicQueueMappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
}
String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingDetail));
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -41,18 +41,18 @@ public class TopicQueueMappingManager extends ConfigManager {
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
private final ConcurrentMap<String, TopicQueueMappingInfo> topicQueueMappingTable = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TopicQueueMappingDetail> topicQueueMappingTable = new ConcurrentHashMap<>();
public TopicQueueMappingManager(BrokerController brokerController) {
this.brokerController = brokerController;
}
public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) {
topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail);
}
public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
return topicQueueMappingTable.get(topic);
}
......@@ -86,7 +86,7 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
public ConcurrentMap<String, TopicQueueMappingInfo> getTopicQueueMappingTable() {
public ConcurrentMap<String, TopicQueueMappingDetail> getTopicQueueMappingTable() {
return topicQueueMappingTable;
}
......
......@@ -18,15 +18,15 @@ package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping {
private TopicConfig topicConfig;
private TopicQueueMappingInfo topicQueueMappingInfo;
private TopicQueueMappingDetail topicQueueMappingDetail;
public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) {
this.topicConfig = topicConfig;
this.topicQueueMappingInfo = topicQueueMappingInfo;
this.topicQueueMappingDetail = topicQueueMappingDetail;
}
public TopicQueueMappingInfo getTopicQueueMappingInfo() {
return topicQueueMappingInfo;
public TopicQueueMappingDetail getTopicQueueMappingInfo() {
return topicQueueMappingDetail;
}
public TopicConfig getTopicConfig() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
buildIdMap();
}
public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
}
hostedQueues.put(globalId, mappingInfo);
buildIdMap();
return true;
}
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker
//level 1 means previous leader in this broker
assert level == LEVEL_0 || level == LEVEL_1;
if (hostedQueues == null || hostedQueues.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
}
ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
Integer globalId = entry.getKey();
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
if (level == LEVEL_0
&& items.size() >= 1) {
LogicQueueMappingItem curr = items.get(items.size() - 1);
if (bname.equals(curr.getBname())) {
tmpIdMap.put(curr.getQueueId(), globalId);
}
} else if (level == LEVEL_1
&& items.size() >= 2) {
LogicQueueMappingItem prev = items.get(items.size() - 1);
if (bname.equals(prev.getBname())) {
tmpIdMap.put(prev.getQueueId(), globalId);
}
}
}
return tmpIdMap;
}
public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId);
}
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
return topicQueueMappingInfo;
}
public int getTotalQueues() {
return totalQueues;
}
public void setTotalQueues(int totalQueues) {
this.totalQueues = totalQueues;
}
public String getBname() {
return bname;
}
public String getTopic() {
return topic;
}
}
......@@ -16,11 +16,8 @@
*/
package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -28,78 +25,20 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public static final int LEVEL_0 = 0;
public static final int LEVEL_1 = 1;
private String topic; // redundant field
private int totalQueues;
private String bname; //identify the hosted broker name
// the mapping info in current broker, do not register to nameserver
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
String topic; // redundant field
int totalQueues;
String bname; //identify the hosted broker name
//register to broker to construct the route
private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
protected ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
buildIdMap();
}
public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
}
hostedQueues.put(globalId, mappingInfo);
buildIdMap();
return true;
}
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker
//level 1 means previous leader in this broker
assert level == LEVEL_0 || level == LEVEL_1;
if (hostedQueues == null || hostedQueues.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
}
ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
Integer globalId = entry.getKey();
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
if (level == LEVEL_0
&& items.size() >= 1) {
LogicQueueMappingItem curr = items.get(items.size() - 1);
if (bname.equals(curr.getBname())) {
tmpIdMap.put(curr.getQueueId(), globalId);
}
} else if (level == LEVEL_1
&& items.size() >= 2) {
LogicQueueMappingItem prev = items.get(items.size() - 1);
if (bname.equals(prev.getBname())) {
tmpIdMap.put(prev.getQueueId(), globalId);
}
}
}
return tmpIdMap;
}
public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId);
}
public TopicQueueMappingInfo clone4register() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
return topicQueueMappingInfo;
}
public int getTotalQueues() {
return totalQueues;
......@@ -116,4 +55,12 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public String getTopic() {
return topic;
}
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap;
}
public ConcurrentMap<Integer, Integer> getPrevIdMap() {
return prevIdMap;
}
}
......@@ -33,6 +33,7 @@ import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
......
......@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
public class TopicQueueMappingBody extends TopicQueueMappingInfo {
public class TopicQueueMappingBody extends TopicQueueMappingDetail {
public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
......
......@@ -18,20 +18,20 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.Map;
public class TopicQueueMappingSerializeWrapper extends RemotingSerializable {
private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap;
private Map<String/* topic */, TopicQueueMappingDetail> topicQueueMappingInfoMap;
private DataVersion dataVersion = new DataVersion();
public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
public Map<String, TopicQueueMappingDetail> getTopicQueueMappingInfoMap() {
return topicQueueMappingInfoMap;
}
public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingDetail> topicQueueMappingInfoMap) {
this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
}
......
......@@ -20,7 +20,6 @@
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import java.util.Set;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -29,9 +28,6 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
private int sysFlag;
private Set<Integer> logicalQueueIdsFilter;
@Override
public void checkFields() throws RemotingCommandException {
}
......@@ -43,20 +39,4 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
public void setTopic(String topic) {
this.topic = topic;
}
public int getSysFlag() {
return sysFlag;
}
public void setSysFlag(int sysFlag) {
this.sysFlag = sysFlag;
}
public void setLogicalQueueIdsFilter(Set<Integer> filter) {
this.logicalQueueIdsFilter = filter;
}
public Set<Integer> getLogicalQueueIdsFilter() {
return logicalQueueIdsFilter;
}
}
......@@ -23,6 +23,9 @@ package org.apache.rocketmq.common.protocol.route;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicRouteData extends RemotingSerializable {
......@@ -30,7 +33,7 @@ public class TopicRouteData extends RemotingSerializable {
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private LogicalQueuesInfo logicalQueuesInfo;
private Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker;
public TopicRouteData() {
}
......@@ -53,8 +56,8 @@ public class TopicRouteData extends RemotingSerializable {
this.filterServerTable.putAll(topicRouteData.filterServerTable);
}
if (topicRouteData.logicalQueuesInfo != null) {
this.logicalQueuesInfo = new LogicalQueuesInfo(topicRouteData.logicalQueuesInfo);
if (topicRouteData.topicQueueMappingByBroker != null) {
this.topicQueueMappingByBroker = new HashMap<String, TopicQueueMappingInfo>(topicRouteData.topicQueueMappingByBroker);
}
}
......@@ -90,12 +93,12 @@ public class TopicRouteData extends RemotingSerializable {
this.orderTopicConf = orderTopicConf;
}
public LogicalQueuesInfo getLogicalQueuesInfo() {
return logicalQueuesInfo;
public Map<String, TopicQueueMappingInfo> getTopicQueueMappingByBroker() {
return topicQueueMappingByBroker;
}
public void setLogicalQueuesInfo(LogicalQueuesInfo logicalQueuesInfo) {
this.logicalQueuesInfo = logicalQueuesInfo;
public void setTopicQueueMappingByBroker(Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker) {
this.topicQueueMappingByBroker = topicQueueMappingByBroker;
}
@Override
......@@ -106,7 +109,7 @@ public class TopicRouteData extends RemotingSerializable {
result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
result = prime * result + ((logicalQueuesInfo == null) ? 0 : logicalQueuesInfo.hashCode());
result = prime * result + ((topicQueueMappingByBroker == null) ? 0 : topicQueueMappingByBroker.hashCode());
return result;
}
......@@ -139,10 +142,10 @@ public class TopicRouteData extends RemotingSerializable {
return false;
} else if (!filterServerTable.equals(other.filterServerTable))
return false;
if (logicalQueuesInfo == null) {
if (other.logicalQueuesInfo != null)
if (topicQueueMappingByBroker == null) {
if (other.topicQueueMappingByBroker != null)
return false;
} else if (!logicalQueuesInfo.equals(other.logicalQueuesInfo))
} else if (!topicQueueMappingByBroker.equals(other.topicQueueMappingByBroker))
return false;
return true;
}
......@@ -150,6 +153,6 @@ public class TopicRouteData extends RemotingSerializable {
@Override
public String toString() {
return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
+ ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", logicalQueuesInfo=" + logicalQueuesInfo + "]";
+ ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", topicQueueMappingInfoTable=" + topicQueueMappingByBroker + "]";
}
}
......@@ -56,7 +56,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), false);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
......
......@@ -19,7 +19,6 @@ package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
......@@ -53,9 +52,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -353,9 +350,7 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
boolean includeLogicalQueuesInfo = (requestHeader.getSysFlag() & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0;
TopicRouteDataNameSrv topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), includeLogicalQueuesInfo);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
......@@ -365,16 +360,6 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
topicRouteData.setOrderTopicConf(orderTopicConf);
}
Set<Integer> logicalQueueIdsFilter = requestHeader.getLogicalQueueIdsFilter();
if (logicalQueueIdsFilter != null) {
LogicalQueuesInfoUnordered logicalQueuesInfo = topicRouteData.getLogicalQueuesInfoUnordered();
if (logicalQueuesInfo != null) {
LogicalQueuesInfoUnordered filtered = new LogicalQueuesInfoUnordered(logicalQueueIdsFilter.size());
logicalQueueIdsFilter.forEach(integer -> filtered.put(integer, logicalQueuesInfo.get(integer)));
topicRouteData.setLogicalQueuesInfoUnordered(filtered);
}
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -25,19 +24,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
......@@ -45,13 +43,10 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
......@@ -62,7 +57,8 @@ public class RouteInfoManager {
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final ConcurrentMap<String/* topic */, LogicalQueuesInfoUnordered> logicalQueuesInfoTable;
private final HashMap<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
......@@ -70,7 +66,7 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
this.logicalQueuesInfoTable = new ConcurrentHashMap<>(1024);
this.topicQueueMappingInfoTable = new HashMap<String, Map<String, TopicQueueMappingInfo>>(1024);
}
public byte[] getAllClusterInfo() {
......@@ -158,20 +154,21 @@ public class RouteInfoManager {
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigWrapper.getLogicalQueuesInfoMap();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
if (logicalQueuesInfoMap != null) {
long startTime = System.nanoTime();
for (Map.Entry<String, LogicalQueuesInfo> entry : logicalQueuesInfoMap.entrySet()) {
String topicName = entry.getKey();
LogicalQueuesInfoUnordered logicalQueuesInfo = ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignore -> new LogicalQueuesInfoUnordered());
mergeLogicalQueuesInfo(brokerName, topicName, logicalQueuesInfo, entry.getValue());
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
}
log.debug("mergeQueueRouteDataTable topic={} time={}ns", System.nanoTime() - startTime);
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
......@@ -403,12 +400,8 @@ public class RouteInfoManager {
}
}
public TopicRouteDataNameSrv pickupTopicRouteData(final String topic) {
return pickupTopicRouteData(topic, false);
}
public TopicRouteDataNameSrv pickupTopicRouteData(final String topic, boolean includeLogicalQueuesInfo) {
TopicRouteDataNameSrv topicRouteData = new TopicRouteDataNameSrv();
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
......@@ -417,6 +410,7 @@ public class RouteInfoManager {
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
try {
try {
......@@ -456,10 +450,6 @@ public class RouteInfoManager {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
if (includeLogicalQueuesInfo) {
topicRouteData.setLogicalQueuesInfoUnordered(logicalQueuesInfoTable.get(topic));
}
return topicRouteData;
}
......@@ -790,34 +780,6 @@ public class RouteInfoManager {
return topicList.encode();
}
private static void mergeLogicalQueuesInfo(String brokerName, String topicName,
LogicalQueuesInfoUnordered logicalQueuesInfoInNamesrv,
LogicalQueuesInfo logicalQueuesInfoFromBroker) {
Set<LogicalQueuesInfoUnordered.Key> newKeys = logicalQueuesInfoFromBroker.values()
.stream()
.flatMap(Collection::stream)
.filter(v -> Objects.equals(brokerName, v.getBrokerName()))
.map(v -> new LogicalQueuesInfoUnordered.Key(null, v.getQueueId(), v.getOffsetDelta()))
.collect(Collectors.toSet());
logicalQueuesInfoInNamesrv.values().forEach(m ->
m.values().removeIf(queueRouteData ->
Objects.equals(brokerName, queueRouteData.getBrokerName()) &&
!newKeys.contains(new LogicalQueuesInfoUnordered.Key(null, queueRouteData.getQueueId(), queueRouteData.getOffsetDelta()))));
logicalQueuesInfoFromBroker.forEach((logicalQueueId, queueRouteDataListFromBroker) -> {
if (logicalQueueId == null) {
log.warn("queueRouteDataTable topic {} contains null logicalQueueId: {}", topicName, logicalQueuesInfoFromBroker);
return;
}
queueRouteDataListFromBroker.stream()
.filter(queueRouteDataFromBroker -> Objects.equals(brokerName, queueRouteDataFromBroker.getBrokerName()))
.forEach(queueRouteDataFromBroker ->
ConcurrentHashMapUtil.computeIfAbsent(logicalQueuesInfoInNamesrv, logicalQueueId, ignored -> new ConcurrentHashMap<>(queueRouteDataListFromBroker.size()))
.put(new LogicalQueuesInfoUnordered.Key(brokerName, queueRouteDataFromBroker.getQueueId(), queueRouteDataFromBroker.getOffsetDelta()),
queueRouteDataFromBroker)
);
});
}
}
class BrokerLiveInfo {
......
......@@ -200,7 +200,7 @@ public class DefaultRequestProcessorTest {
.contains(new HashMap.SimpleEntry("broker", broker));
}
@Test
/*@Test
public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
String cluster = "cluster";
String broker1Name = "broker1";
......@@ -299,7 +299,7 @@ public class DefaultRequestProcessorTest {
assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
}
}
*/
@Test
public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册