diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 16f41e5e172dcd84e8886328bcf5d18d0855b9a2..959207e16dd76f28fab145c53e6a97150593d6ca 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -1,5 +1,7 @@ package org.apache.rocketmq.common.statictopic; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class LogicQueueMappingItem extends RemotingSerializable { @@ -135,6 +137,41 @@ public class LogicQueueMappingItem extends RemotingSerializable { this.startOffset = startOffset; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + + if (!(o instanceof LogicQueueMappingItem)) return false; + + LogicQueueMappingItem item = (LogicQueueMappingItem) o; + + return new EqualsBuilder() + .append(gen, item.gen) + .append(queueId, item.queueId) + .append(logicOffset, item.logicOffset) + .append(startOffset, item.startOffset) + .append(endOffset, item.endOffset) + .append(timeOfStart, item.timeOfStart) + .append(timeOfEnd, item.timeOfEnd) + .append(bname, item.bname) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(gen) + .append(queueId) + .append(bname) + .append(logicOffset) + .append(startOffset) + .append(endOffset) + .append(timeOfStart) + .append(timeOfEnd) + .toHashCode(); + } + @Override public String toString() { return "LogicQueueMappingItem{" + diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java index af891d03d984f406e44cb65b751c7e42a7f28cc5..cef6418bda6a461a162198c0cebdc71556c10779 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common.statictopic; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.rocketmq.common.TopicConfig; public class TopicConfigAndQueueMapping extends TopicConfig { @@ -36,4 +38,26 @@ public class TopicConfigAndQueueMapping extends TopicConfig { public void setMappingDetail(TopicQueueMappingDetail mappingDetail) { this.mappingDetail = mappingDetail; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + + if (!(o instanceof TopicConfigAndQueueMapping)) return false; + + TopicConfigAndQueueMapping that = (TopicConfigAndQueueMapping) o; + + return new EqualsBuilder() + .appendSuper(super.equals(o)) + .append(mappingDetail, that.mappingDetail) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .appendSuper(super.hashCode()) + .append(mappingDetail) + .toHashCode(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 4a8bae3ea0ebd8f76710b6e4c5e035fcdac7a309..30db2094bef83d1060523380ad0f2b01cd2e7007 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.common.statictopic; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -124,4 +127,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public void setHostedQueues(ConcurrentMap> hostedQueues) { this.hostedQueues = hostedQueues; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + + if (!(o instanceof TopicQueueMappingDetail)) return false; + + TopicQueueMappingDetail that = (TopicQueueMappingDetail) o; + + return new EqualsBuilder() + .append(hostedQueues, that.hostedQueues) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(hostedQueues) + .toHashCode(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index 7636fd5d8a97bf63eee5728a3fbaecc32be05e01..53041aaee8a5a54a9f3b7718abd6d3927fa63fef 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common.statictopic; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import java.util.concurrent.ConcurrentHashMap; @@ -93,5 +95,33 @@ public class TopicQueueMappingInfo extends RemotingSerializable { this.currIdMap = currIdMap; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TopicQueueMappingInfo)) return false; + + TopicQueueMappingInfo info = (TopicQueueMappingInfo) o; + + return new EqualsBuilder() + .append(totalQueues, info.totalQueues) + .append(epoch, info.epoch) + .append(dirty, info.dirty) + .append(topic, info.topic) + .append(bname, info.bname) + .append(currIdMap, info.currIdMap) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(topic) + .append(totalQueues) + .append(bname) + .append(epoch) + .append(dirty) + .append(currIdMap) + .toHashCode(); + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java index d571f6593d599c202f1c4dd4a80ed64e50fdbed0..9b9ab6b7722bd5733b60a02993fbce51d333e231 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java @@ -31,6 +31,7 @@ public class TopicQueueMappingTest { //test the decode encode { LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class); + Assert.assertEquals(mappingItem, mappingItemFromJson); Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false)); } TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis()); @@ -48,6 +49,7 @@ public class TopicQueueMappingTest { TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class); Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size()); Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size()); + Assert.assertEquals(mappingItem, mappingDetailFromJson.getHostedQueues().get(0).get(0)); Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false)); } } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index bb21dc294ad7e2148f6a4a7efddcf59f38e71012..73215ba22ec1892ba512128f2bb686557ffedb3e 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -6,6 +6,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -16,6 +17,8 @@ import org.junit.FixMethodOrder; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -42,32 +45,48 @@ public class StaticTopicIT extends BaseConf { clientMetadata.refreshClusterInfo(clusterInfo); } + public Map createStaticTopic(String topic, int queueNum, Set targetBrokers) throws Exception { + Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + Assert.assertTrue(brokerConfigMap.isEmpty()); + TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + //If some succeed, and others fail, it will cause inconsistent data + for (Map.Entry entry : brokerConfigMap.entrySet()) { + String broker = entry.getKey(); + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + } + return brokerConfigMap; + } + @Test - public void testCreateStaticTopic() throws Exception { + public void testCreateAndRemappingStaticTopic() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); int queueNum = 10; - Set brokers = getBrokers(); - //create topic + Map brokerConfigMap = createStaticTopic(topic, queueNum, getBrokers()); { - Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); - Assert.assertTrue(brokerConfigMap.isEmpty()); - TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap); - //If some succeed, and others fail, it will cause inconsistent data - for (Map.Entry entry : brokerConfigMap.entrySet()) { + Map brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + Assert.assertEquals(2, brokerConfigMapFromRemote.size()); + for (Map.Entry entry: brokerConfigMapFromRemote.entrySet()) { String broker = entry.getKey(); - String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + TopicConfigAndQueueMapping configMappingLocal = brokerConfigMap.get(broker); + Assert.assertNotNull(configMappingLocal); + Assert.assertEquals(configMapping, configMappingLocal); } + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMapFromRemote); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMapFromRemote.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); } - Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); - - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); - Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true); - Assert.assertEquals(queueNum, globalIdMap.size()); + /*{ + Set targetBrokers = Collections.singleton(broker1Name); + Map brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMapFromRemote, targetBrokers); + }*/ } + @After public void tearDown() { super.shutdown();