提交 165e133a 编写于 作者: D dongeforever

Enable to run from file for createStaticTopic command

上级 82364fc2
......@@ -17,13 +17,13 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>();
......
......@@ -17,9 +17,12 @@
package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.MixAll;
import java.io.File;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
......@@ -103,6 +106,65 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
}
public static List<TopicQueueMappingDetail> getMappingDetailFromConfig(Collection<TopicConfigAndQueueMapping> configs) {
List<TopicQueueMappingDetail> detailList = new ArrayList<TopicQueueMappingDetail>();
for (TopicConfigAndQueueMapping configMapping : configs) {
if (configMapping.getMappingDetail() != null) {
detailList.add(configMapping.getMappingDetail());
}
}
return detailList;
}
public static Map.Entry<Long, Integer> validConsistenceOfTopicConfigAndQueueMapping(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
if (brokerConfigMap == null
|| brokerConfigMap.isEmpty()) {
return null;
}
//make sure it it not null
String topic = null;
long maxEpoch = -1;
int maxNum = -1;
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
TopicConfigAndQueueMapping configMapping = entry.getValue();
if (configMapping.getMappingDetail() == null) {
throw new RuntimeException("Mapping info should not be null in broker " + broker);
}
TopicQueueMappingDetail mappingDetail = configMapping.getMappingDetail();
if (!broker.equals(mappingDetail.getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", broker, mappingDetail.getBname()));
}
if (mappingDetail.isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + broker);
}
if (!configMapping.getTopicName().equals(mappingDetail.getTopic())) {
throw new RuntimeException("The topic name is inconsistent in broker " + broker);
}
if (topic != null
&& !topic.equals(mappingDetail.getTopic())) {
throw new RuntimeException("The topic name is inconsistent in broker " + broker);
} else {
topic = mappingDetail.getTopic();
}
if (maxEpoch != -1
&& maxEpoch != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpoch, mappingDetail.getEpoch(), mappingDetail.getBname()));
} else {
maxEpoch = mappingDetail.getEpoch();
}
if (maxNum != -1
&& maxNum != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxNum, mappingDetail.getTotalQueues(), mappingDetail.getBname()));
} else {
maxNum = mappingDetail.getTotalQueues();
}
}
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
......@@ -153,4 +215,19 @@ public class TopicQueueMappingUtils {
assert items.size() > 0;
return items.get(items.size() - 1);
}
public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) {
String topic = wrapper.getTopic();
String data = wrapper.toJson();
String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix;
try {
MixAll.string2File(data, fileName);
return fileName;
} catch (Exception e) {
throw new RuntimeException("write file failed " + fileName,e);
}
}
}
package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
import java.util.Map;
public class TopicRemappingDetailWrapper extends RemotingSerializable {
public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
public static final String TYPE_REMAPPING = "REMAPPING";
private final String topic;
private final String type;
private final long epoch;
private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
this.topic = topic;
this.type = type;
this.epoch = epoch;
this.expectedIdToBroker = expectedIdToBroker;
this.brokerConfigMap = brokerConfigMap;
}
public String getTopic() {
return topic;
}
public String getType() {
return type;
}
public long getEpoch() {
return epoch;
}
public Map<Integer, String> getExpectedIdToBroker() {
return expectedIdToBroker;
}
public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
return brokerConfigMap;
}
}
package org.apache.rocketmq.common;
package org.apache.rocketmq.common.statictopic;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
......@@ -9,10 +9,18 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Map;
public class TopicQueueMappingTest {
@Test
public void testWriteToFile() {
System.out.println(System.getProperty("java.io.tmpdir"));
System.out.println(File.separator);
}
@Test
public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
......@@ -30,7 +38,7 @@ public class TopicQueueMappingTest {
Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01");
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail);
......
......@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
......@@ -31,17 +32,18 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
......@@ -81,18 +83,64 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt.setRequired(true);
options.addOption(opt);
return options;
}
public void executeFromFile(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try {
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : wrapper.getBrokerConfigMap().entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
if (commandLine.hasOption("f")) {
executeFromFile(commandLine, options, rpcHook);
return;
}
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
......@@ -111,8 +159,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
clientMetadata.refreshClusterInfo(clusterInfo);
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
......@@ -131,8 +178,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//get the existed topic config and mapping
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
......@@ -141,41 +192,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
existedTopicConfigMap.put(bname, mapping);
brokerConfigMap.put(bname, mapping);
}
}
}
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!existedTopicConfigMap.isEmpty()) {
//make sure it it not null
existedTopicConfigMap.forEach((key, value) -> {
if (value.getMappingDetail() != null) {
throw new RuntimeException("Mapping info should be null in broker " + key);
}
});
//make sure the detail is not dirty
existedTopicConfigMap.forEach((key, value) -> {
if (!key.equals(value.getMappingDetail().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
}
if (value.getMappingDetail().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
for (TopicQueueMappingDetail mappingDetail : detailList) {
if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
}
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
......@@ -184,24 +210,32 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before");
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<Integer, String> idToBroker = new HashMap<>();
final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = value.getBname();
idToBroker.put(key, leaderbroker);
oldIdToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
});
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
......@@ -210,13 +244,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) {
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
existedTopicConfigMap.put(broker, configMapping);
brokerConfigMap.put(broker, configMapping);
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
......@@ -224,15 +258,23 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
//double check the topic config map
existedTopicConfigMap.values().forEach( configMapping -> {
configMapping.getMappingDetail().setEpoch(epoch);
// set the topic config
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
//double check the config
TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after");
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册