提交 f225fd05 编写于 作者: D dongeforever

Polish the code for command

上级 165e133a
......@@ -116,13 +116,12 @@ public class TopicQueueMappingUtils {
return detailList;
}
public static Map.Entry<Long, Integer> validConsistenceOfTopicConfigAndQueueMapping(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
public static Map.Entry<Long, Integer> checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
if (brokerConfigMap == null
|| brokerConfigMap.isEmpty()) {
return null;
}
//make sure it it not null
String topic = null;
long maxEpoch = -1;
int maxNum = -1;
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
......@@ -143,9 +142,7 @@ public class TopicQueueMappingUtils {
}
if (topic != null
&& !topic.equals(mappingDetail.getTopic())) {
throw new RuntimeException("The topic name is inconsistent in broker " + broker);
} else {
topic = mappingDetail.getTopic();
throw new RuntimeException("The topic name is not match for broker " + broker);
}
if (maxEpoch != -1
......@@ -165,7 +162,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
......@@ -216,10 +213,14 @@ public class TopicQueueMappingUtils {
return items.get(items.size() - 1);
}
public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) {
public static String writeToTemp(TopicRemappingDetailWrapper wrapper, boolean after) {
String topic = wrapper.getTopic();
String data = wrapper.toJson();
String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix;
String suffix = TopicRemappingDetailWrapper.SUFFIX_BEFORE;
if (after) {
suffix = TopicRemappingDetailWrapper.SUFFIX_AFTER;
}
String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + suffix;
try {
MixAll.string2File(data, fileName);
return fileName;
......
......@@ -3,12 +3,17 @@ package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class TopicRemappingDetailWrapper extends RemotingSerializable {
public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
public static final String TYPE_REMAPPING = "REMAPPING";
public static final String SUFFIX_BEFORE = ".before";
public static final String SUFFIX_AFTER = ".after";
private final String topic;
private final String type;
......@@ -17,6 +22,10 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
private Set<String> brokerToMapIn = new HashSet<String>();
private Set<String> brokerToMapOut = new HashSet<String>();
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
this.topic = topic;
this.type = type;
......@@ -44,4 +53,20 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
return brokerConfigMap;
}
public Set<String> getBrokerToMapIn() {
return brokerToMapIn;
}
public void setBrokerToMapIn(Set<String> brokerToMapIn) {
this.brokerToMapIn = brokerToMapIn;
}
public Set<String> getBrokerToMapOut() {
return brokerToMapOut;
}
public void setBrokerToMapOut(Set<String> brokerToMapOut) {
this.brokerToMapOut = brokerToMapOut;
}
}
......@@ -21,9 +21,9 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.admin.TopicOffset;
......@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
......@@ -79,24 +80,129 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt.setRequired(false);
options.addOption(opt);
return options;
}
public void executeFromFile(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try {
String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
RPCHook rpcHook) throws SubCommandException {
if (!commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
if (commandLine.hasOption("f")) {
executeFromFile(commandLine, options, rpcHook);
return;
}
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
Map.Entry<Long, Integer> maxEpochAndNum = null;
Set<String> targetBrokers = new HashSet<>();
try {
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption('t')) {
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
......@@ -108,84 +214,67 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
for (String broker: brokerStrs.split(",")) {
brokers.add(broker.trim());
}
} else if (commandLine.hasOption("c")) {
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
{
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
for (String broker: brokerStrs.split(",")) {
targetBrokers.add(broker.trim());
}
} else if (commandLine.hasOption("c")) {
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
}
}
}
}
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers");
}
for (String broker : brokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
if (targetBrokers.isEmpty()) {
throw new RuntimeException("Find none brokers, do nothing");
}
for (String broker : targetBrokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
}
//get the existed topic config and mapping
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
existedTopicConfigMap.put(bname, mapping);
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
}
if (existedTopicConfigMap.isEmpty()) {
if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
//make sure it it not null
existedTopicConfigMap.forEach((key, value) -> {
if (value.getMappingDetail() != null) {
throw new RuntimeException("Mapping info should be null in broker " + key);
}
});
//make sure the detail is not dirty
existedTopicConfigMap.forEach((key, value) -> {
if (!key.equals(value.getMappingDetail().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
}
if (value.getMappingDetail().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
for (TopicQueueMappingDetail mappingDetail : detailList) {
if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
}
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
long maxEpoch = maxEpochAndNum.getKey();
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
......@@ -213,6 +302,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
expectedIdToBroker.put(queueId, broker);
}
});
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<>();
......@@ -230,8 +320,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
......@@ -246,61 +336,24 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
existedTopicConfigMap.values().forEach( configMapping -> {
configMapping.getMappingDetail().setEpoch(epoch);
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
//double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
newWrapper.setBrokerToMapIn(brokersToMapIn);
newWrapper.setBrokerToMapOut(brokersToMapOut);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
......
......@@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -84,7 +83,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt.setRequired(true);
opt.setRequired(false);
options.addOption(opt);
return options;
......@@ -99,12 +98,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
try {
String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
......@@ -131,6 +131,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
if (!commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
if (commandLine.hasOption("f")) {
executeFromFile(commandLine, options, rpcHook);
return;
......@@ -159,7 +164,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
clientMetadata.refreshClusterInfo(clusterInfo);
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
......@@ -200,8 +204,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
......@@ -213,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before");
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
......@@ -264,12 +268,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
//double check the config
TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after");
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册