提交 cb7e032f 编写于 作者: D dongeforever

Finish the test for createStaticTopic

上级 a493ca44
package org.apache.rocketmq.common.statictopic; package org.apache.rocketmq.common.statictopic;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class LogicQueueMappingItem extends RemotingSerializable { public class LogicQueueMappingItem extends RemotingSerializable {
...@@ -135,6 +137,41 @@ public class LogicQueueMappingItem extends RemotingSerializable { ...@@ -135,6 +137,41 @@ public class LogicQueueMappingItem extends RemotingSerializable {
this.startOffset = startOffset; this.startOffset = startOffset;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof LogicQueueMappingItem)) return false;
LogicQueueMappingItem item = (LogicQueueMappingItem) o;
return new EqualsBuilder()
.append(gen, item.gen)
.append(queueId, item.queueId)
.append(logicOffset, item.logicOffset)
.append(startOffset, item.startOffset)
.append(endOffset, item.endOffset)
.append(timeOfStart, item.timeOfStart)
.append(timeOfEnd, item.timeOfEnd)
.append(bname, item.bname)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(gen)
.append(queueId)
.append(bname)
.append(logicOffset)
.append(startOffset)
.append(endOffset)
.append(timeOfStart)
.append(timeOfEnd)
.toHashCode();
}
@Override @Override
public String toString() { public String toString() {
return "LogicQueueMappingItem{" + return "LogicQueueMappingItem{" +
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
*/ */
package org.apache.rocketmq.common.statictopic; package org.apache.rocketmq.common.statictopic;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
public class TopicConfigAndQueueMapping extends TopicConfig { public class TopicConfigAndQueueMapping extends TopicConfig {
...@@ -36,4 +38,26 @@ public class TopicConfigAndQueueMapping extends TopicConfig { ...@@ -36,4 +38,26 @@ public class TopicConfigAndQueueMapping extends TopicConfig {
public void setMappingDetail(TopicQueueMappingDetail mappingDetail) { public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
this.mappingDetail = mappingDetail; this.mappingDetail = mappingDetail;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopicConfigAndQueueMapping)) return false;
TopicConfigAndQueueMapping that = (TopicConfigAndQueueMapping) o;
return new EqualsBuilder()
.appendSuper(super.equals(o))
.append(mappingDetail, that.mappingDetail)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.appendSuper(super.hashCode())
.append(mappingDetail)
.toHashCode();
}
} }
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
*/ */
package org.apache.rocketmq.common.statictopic; package org.apache.rocketmq.common.statictopic;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -124,4 +127,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -124,4 +127,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void setHostedQueues(ConcurrentMap<Integer, List<LogicQueueMappingItem>> hostedQueues) { public void setHostedQueues(ConcurrentMap<Integer, List<LogicQueueMappingItem>> hostedQueues) {
this.hostedQueues = hostedQueues; this.hostedQueues = hostedQueues;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopicQueueMappingDetail)) return false;
TopicQueueMappingDetail that = (TopicQueueMappingDetail) o;
return new EqualsBuilder()
.append(hostedQueues, that.hostedQueues)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(hostedQueues)
.toHashCode();
}
} }
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
*/ */
package org.apache.rocketmq.common.statictopic; package org.apache.rocketmq.common.statictopic;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -93,5 +95,33 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -93,5 +95,33 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
this.currIdMap = currIdMap; this.currIdMap = currIdMap;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopicQueueMappingInfo)) return false;
TopicQueueMappingInfo info = (TopicQueueMappingInfo) o;
return new EqualsBuilder()
.append(totalQueues, info.totalQueues)
.append(epoch, info.epoch)
.append(dirty, info.dirty)
.append(topic, info.topic)
.append(bname, info.bname)
.append(currIdMap, info.currIdMap)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(topic)
.append(totalQueues)
.append(bname)
.append(epoch)
.append(dirty)
.append(currIdMap)
.toHashCode();
}
} }
...@@ -31,6 +31,7 @@ public class TopicQueueMappingTest { ...@@ -31,6 +31,7 @@ public class TopicQueueMappingTest {
//test the decode encode //test the decode encode
{ {
LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class); LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class);
Assert.assertEquals(mappingItem, mappingItemFromJson);
Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false)); Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false));
} }
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis()); TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
...@@ -48,6 +49,7 @@ public class TopicQueueMappingTest { ...@@ -48,6 +49,7 @@ public class TopicQueueMappingTest {
TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class); TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size()); Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size());
Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size()); Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size());
Assert.assertEquals(mappingItem, mappingDetailFromJson.getHostedQueues().get(0).get(0));
Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false)); Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
} }
} }
......
...@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata; ...@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
...@@ -16,6 +17,8 @@ import org.junit.FixMethodOrder; ...@@ -16,6 +17,8 @@ import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -42,32 +45,48 @@ public class StaticTopicIT extends BaseConf { ...@@ -42,32 +45,48 @@ public class StaticTopicIT extends BaseConf {
clientMetadata.refreshClusterInfo(clusterInfo); clientMetadata.refreshClusterInfo(clusterInfo);
} }
public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty());
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
return brokerConfigMap;
}
@Test @Test
public void testCreateStaticTopic() throws Exception { public void testCreateAndRemappingStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic(); String topic = "static" + MQRandomUtils.getRandomTopic();
int queueNum = 10; int queueNum = 10;
Set<String> brokers = getBrokers(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
//create topic
{ {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty()); Assert.assertEquals(2, brokerConfigMapFromRemote.size());
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap); for (Map.Entry<String, TopicConfigAndQueueMapping> entry: brokerConfigMapFromRemote.entrySet()) {
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey(); String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue(); TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); TopicConfigAndQueueMapping configMappingLocal = brokerConfigMap.get(broker);
Assert.assertNotNull(configMappingLocal);
Assert.assertEquals(configMapping, configMappingLocal);
} }
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMapFromRemote);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMapFromRemote.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
} }
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); /*{
Set<String> targetBrokers = Collections.singleton(broker1Name);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true); TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMapFromRemote, targetBrokers);
Assert.assertEquals(queueNum, globalIdMap.size());
}*/
} }
@After @After
public void tearDown() { public void tearDown() {
super.shutdown(); super.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册