提交 25a588b8 编写于 作者: D dongeforever

Init the remapping command

上级 45946d47
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingOne extends RemotingSerializable {
String topic; // redundant field
String bname; //identify the hosted broker name
Integer globalId;
ImmutableList<LogicQueueMappingItem> items;
public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList<LogicQueueMappingItem> items) {
this.topic = topic;
this.bname = bname;
this.globalId = globalId;
this.items = items;
}
public String getTopic() {
return topic;
}
public String getBname() {
return bname;
}
public Integer getGlobalId() {
return globalId;
}
public ImmutableList<LogicQueueMappingItem> getItems() {
return items;
}
}
......@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
}
public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
......@@ -111,7 +111,7 @@ public class TopicQueueMappingUtils {
}
});
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
......@@ -125,7 +125,7 @@ public class TopicQueueMappingUtils {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
}
} else {
globalIdMap.put(globalid, entry.getValue());
globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue()));
}
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
public class RemappingStaticTopicSubCommand implements SubCommand {
@Override
public String commandName() {
return "updateStaticTopic";
}
@Override
public String commandDesc() {
return "Update or create static topic, which has fixed number of queues";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = null;
opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated");
optionGroup.addOption(opt);
opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
Map.Entry<Long, Integer> maxEpochAndNum = null;
try {
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
String topic = commandLine.getOptionValue('t').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
for (String broker: brokerStrs.split(",")) {
brokers.add(broker.trim());
}
} else if (commandLine.hasOption("c")) {
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
}
}
}
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers");
}
for (String broker : brokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
//get the existed topic config and mapping
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
existedTopicConfigMap.put(bname, mapping);
}
}
}
if (existedTopicConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
//make sure it it not null
existedTopicConfigMap.forEach((key, value) -> {
if (value.getMappingDetail() != null) {
throw new RuntimeException("Mapping info should be null in broker " + key);
}
});
//make sure the detail is not dirty
existedTopicConfigMap.forEach((key, value) -> {
if (!key.equals(value.getMappingDetail().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
}
if (value.getMappingDetail().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
});
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
}
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
}
}
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<>();
//the following logic will make sure that, for one broker, only "take in" or "take out" queues
//It can't, take in some queues but alse take out some queues.
globalIdMap.forEach((queueId, mappingOne) -> {
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
if (expectedBrokerNumMap.get(leaderBroker) > 0) {
expectedIdToBroker.put(queueId, leaderBroker);
expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
} else {
waitAssignQueues.add(queueId);
expectedBrokerNumMap.remove(leaderBroker);
}
} else {
waitAssignQueues.add(queueId);
}
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
expectedIdToBroker.put(waitAssignQueues.poll(), broker);
}
});
Set<Broker>
//Now construct the remapping info
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
existedTopicConfigMap.put(broker, configMapping);
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(0);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
......@@ -64,7 +65,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Option opt = null;
opt = new Option("c", "clusterName", true, "create topic to which cluster");
opt = new Option("c", "clusters", true, "create topic to clusters, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
......@@ -90,7 +91,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
try {
if (!commandLine.hasOption('t')
|| !commandLine.hasOption('c')
......@@ -108,7 +111,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} else {
clientMetadata.refreshClusterInfo(clusterInfo);
}
Set<String> brokers = new HashSet<>();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
......@@ -118,6 +120,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers for " + clusters);
}
for (String broker : brokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
//get the existed topic config and mapping
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
......@@ -188,7 +196,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<Integer, String> idToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value);
String leaderbroker = value.getBname();
idToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
......@@ -204,27 +212,29 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String value = e.getValue();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(value)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
if (!existedTopicConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
existedTopicConfigMap.put(broker, configMapping);
} else {
configMapping = existedTopicConfigMap.get(value);
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
existedTopicConfigMap.values().forEach( configMapping -> {
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册