diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index 9b6775122bf0f288f327d7c744a3aa223579d7c0..188bfcc8b792c9f86bb0efedd633d8b82c56c550 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -29,7 +29,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // make sure this value is not null private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); - public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) { + public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) { super(topic, totalQueues, bname, epoch); buildIdMap(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index b4bf77600ace544252b1d95745eead9308419427..f8c803c12c8f8b48bf9ad249ae733d666b2f230a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -28,14 +28,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable { String topic; // redundant field int totalQueues; String bname; //identify the hosted broker name - int epoch; //important to fence the old dirty data + long epoch; //important to fence the old dirty data boolean dirty; //indicate if the data is dirty //register to broker to construct the route transient ConcurrentMap currIdMap = new ConcurrentHashMap(); //register to broker to help detect remapping failure transient ConcurrentMap prevIdMap = new ConcurrentHashMap(); - public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) { + public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) { this.topic = topic; this.totalQueues = totalQueues; this.bname = bname; @@ -64,11 +64,11 @@ public class TopicQueueMappingInfo extends RemotingSerializable { return topic; } - public int getEpoch() { + public long getEpoch() { return epoch; } - public void setEpoch(int epoch) { + public void setEpoch(long epoch) { this.epoch = epoch; } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java index 87c54a33befb0b07bdd120781a2559c3bea1ff13..686208a8ea0edad56a7de3ca0aa8591907277139 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java @@ -89,8 +89,8 @@ public class TopicQueueMappingUtils { return new MappingAllocator(idToBroker, brokerNumMap); } - public static Map.Entry findMaxEpochAndQueueNum(List mappingDetailList) { - int epoch = -1; + public static Map.Entry findMaxEpochAndQueueNum(List mappingDetailList) { + long epoch = -1; int queueNum = 0; for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { if (mappingDetail.getEpoch() > epoch) { @@ -100,14 +100,14 @@ public class TopicQueueMappingUtils { queueNum = mappingDetail.getTotalQueues(); } } - return new AbstractMap.SimpleImmutableEntry(epoch, queueNum); + return new AbstractMap.SimpleImmutableEntry(epoch, queueNum); } public static Map> buildMappingItems(List mappingDetailList, boolean replace) { Collections.sort(mappingDetailList, new Comparator() { @Override public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { - return o2.getEpoch() - o1.getEpoch(); + return (int)(o2.getEpoch() - o1.getEpoch()); } }); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java index e6a34c4ef861af3e1a8bfc103364fe6813183bb9..ba494aa7ab5815fa04b217198186b458a39f1e67 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java @@ -35,7 +35,7 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) { if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) { - return (TopicConfigAndMappingSerializeWrapper)wrapper; + return (TopicConfigAndMappingSerializeWrapper) wrapper; } TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper(); mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index a1d3f6ff0083c48fe4f82d68d80ce5c3c2d29c07..5cee8c8f95226c17ca1637bb16f7d87fd76cbffd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -134,7 +134,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } } - Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(-1, queueNum); + Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); if (!existedTopicConfigMap.isEmpty()) { //make sure it it not null existedTopicConfigMap.forEach((key, value) -> { @@ -155,7 +155,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); //check the epoch and qnum maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); - final Map.Entry tmpMaxEpochAndNum = maxEpochAndNum; + final Map.Entry tmpMaxEpochAndNum = maxEpochAndNum; detailList.forEach( mappingDetail -> { if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) { throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); @@ -200,7 +200,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Map newIdToBroker = allocator.getIdToBroker(); //construct the topic configAndMapping - int epoch = maxEpochAndNum.getKey() + 1; + long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); newIdToBroker.forEach( (queueId, broker) -> { TopicConfigAndQueueMapping configMapping; if (!existedTopicConfigMap.containsKey(broker)) { @@ -218,6 +218,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); }); + //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : existedTopicConfigMap.entrySet()) { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker);