提交 7bbc7ddf 编写于 作者: D dongeforever

Add the register logic for mapping topic

上级 3e2c9202
......@@ -95,12 +95,14 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.stats.MomentStatsItem;
......@@ -1058,7 +1060,7 @@ public class BrokerController {
return;
}
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
......@@ -1079,30 +1081,30 @@ public class BrokerController {
.collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
String brokerName = this.brokerConfig.getBrokerName();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigList.stream()
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicConfigManager.selectLogicalQueuesInfo(topicName))
.map(info -> {
info.readLock().lock();
try {
return new AbstractMap.SimpleImmutableEntry<>(topicName, new LogicalQueuesInfoInBroker(info, data -> Objects.equals(data.getBrokerName(), brokerName)));
} finally {
info.readLock().unlock();
}
})
.map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
.map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.clone4register()))
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!logicalQueuesInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
if (!topicQueueMappingInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
}
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().clone4register())
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
......@@ -1121,9 +1123,9 @@ public class BrokerController {
}
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
......@@ -1134,7 +1136,7 @@ public class BrokerController {
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
......
......@@ -36,7 +36,7 @@ public class BrokerPathConfigHelper {
}
public static String getTopicQueueMappingPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json";
return rootDir + File.separator + "config" + File.separator + "topicQueueMapping.json";
}
public static String getConsumerOffsetPath(final String rootDir) {
......
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
......@@ -140,7 +141,7 @@ public class BrokerOuterAPI {
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
......
......@@ -441,8 +441,6 @@ public class TopicConfigManager extends ConfigManager {
public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue(), data -> Objects.equals(data.getBrokerName(), brokerName)))));
topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
return topicConfigSerializeWrapper;
}
......@@ -474,7 +472,6 @@ public class TopicConfigManager extends ConfigManager {
public String encode(final boolean prettyFormat) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue()))));
topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
return topicConfigSerializeWrapper.toJson(prettyFormat);
}
......
......@@ -37,6 +37,7 @@ public class TopicQueueMappingManager extends ConfigManager {
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lock = new ReentrantLock();
//this data version should be equal to the TopicConfigManager
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
......@@ -85,6 +86,10 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
public ConcurrentMap<String, TopicQueueMappingInfo> getTopicQueueMappingTable() {
return topicQueueMappingTable;
}
public DataVersion getDataVersion() {
return dataVersion;
}
......
......@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping extends TopicConfig {
public class TopicConfigAndQueueMapping {
private TopicConfig topicConfig;
private TopicQueueMappingInfo topicQueueMappingInfo;
......
......@@ -16,39 +16,91 @@
*/
package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingInfo extends RemotingSerializable {
public static final int LEVEL_0 = 0;
public static final int LEVEL_1 = 1;
private String topic; // redundant field
private int totalQueues;
private String bname; //identify the host name
//the newest mapping is in current broker
private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
private String bname; //identify the hosted broker name
// the mapping info in current broker, do not register to nameserver
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
//register to broker to construct the route
private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
buildIdMap();
}
public boolean putMappingInfo(Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
}
hostedQueues.put(globalId, mappingInfo);
buildIdMap();
return true;
}
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker
//level 1 means previous leader in this broker
assert level == LEVEL_0 || level == LEVEL_1;
if (hostedQueues == null || hostedQueues.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
}
ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
Integer globalId = entry.getKey();
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
if (level == LEVEL_0
&& items.size() >= 1) {
LogicQueueMappingItem curr = items.get(items.size() - 1);
if (bname.equals(curr.getBname())) {
tmpIdMap.put(curr.getQueueId(), globalId);
}
} else if (level == LEVEL_1
&& items.size() >= 2) {
LogicQueueMappingItem prev = items.get(items.size() - 1);
if (bname.equals(prev.getBname())) {
tmpIdMap.put(prev.getQueueId(), globalId);
}
}
}
return tmpIdMap;
}
public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId);
}
public TopicQueueMappingInfo clone4register() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
return topicQueueMappingInfo;
}
public int getTotalQueues() {
return totalQueues;
}
......
......@@ -33,6 +33,7 @@ import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -41,7 +42,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class RegisterBrokerBody extends RemotingSerializable {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
private TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();
public byte[] encode(boolean compress) {
......@@ -82,6 +83,20 @@ public class RegisterBrokerBody extends RemotingSerializable {
// write filter server list json
outputStream.write(buffer);
//write the topic queue mapping
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigSerializeWrapper.getTopicQueueMappingInfoMap();
if (topicQueueMappingInfoMap == null) {
//as the place holder
topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
}
outputStream.write(convertIntToByteArray(topicQueueMappingInfoMap.size()));
for (TopicQueueMappingInfo info: topicQueueMappingInfoMap.values()) {
buffer = JSON.toJSONString(info).getBytes(MixAll.DEFAULT_CHARSET);
outputStream.write(convertIntToByteArray(buffer.length));
// write filter server list json
outputStream.write(buffer);
}
outputStream.finish();
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
......@@ -134,6 +149,17 @@ public class RegisterBrokerBody extends RemotingSerializable {
}
registerBrokerBody.setFilterServerList(filterServerList);
int topicQueueMappingNum = readInt(inflaterInputStream);
Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
for (int i = 0; i < topicQueueMappingNum; i++) {
int mappingJsonLen = readInt(inflaterInputStream);
byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
topicQueueMappingInfoMap.put(info.getTopic(), info);
}
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
LOGGER.info("Decompressing takes {}ms", interval);
......@@ -167,11 +193,11 @@ public class RegisterBrokerBody extends RemotingSerializable {
return byteBuffer.getInt();
}
public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
public TopicConfigAndMappingSerializeWrapper getTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}
public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
public void setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper) {
this.topicConfigSerializeWrapper = topicConfigSerializeWrapper;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper {
private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
return topicQueueMappingInfoMap;
}
public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
}
public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
return (TopicConfigAndMappingSerializeWrapper)wrapper;
}
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
mappingSerializeWrapper.setTopicConfigTable(wrapper.getTopicConfigTable());
return mappingSerializeWrapper;
}
}
......@@ -22,13 +22,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>();
private Map<String/* topic */, LogicalQueuesInfo> logicalQueuesInfoMap;
private DataVersion dataVersion = new DataVersion();
public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
......@@ -46,12 +44,4 @@ public class TopicConfigSerializeWrapper extends RemotingSerializable {
public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}
public Map<String, LogicalQueuesInfo> getLogicalQueuesInfoMap() {
return logicalQueuesInfoMap;
}
public void setLogicalQueuesInfoMap(Map<String, LogicalQueuesInfo> logicalQueuesInfoMap) {
this.logicalQueuesInfoMap = logicalQueuesInfoMap;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册