提交 2ee58971 编写于 作者: D dongeforever

Fix the stability of remapping

上级 5ce093e1
......@@ -39,6 +39,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
super(topic, totalQueues, bname, epoch);
}
public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
......
......@@ -39,12 +39,15 @@ public class TopicQueueMappingUtils {
public static class MappingAllocator {
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
Map<Integer, String> idToBroker = new HashMap<Integer, String>();
//used for remapping
Map<String, Integer> brokerNumMapBeforeRemapping = null;
int currentIndex = 0;
Random random = new Random();
List<String> leastBrokers = new ArrayList<String>();
private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
this.idToBroker.putAll(idToBroker);
this.brokerNumMap.putAll(brokerNumMap);
this.brokerNumMapBeforeRemapping = brokerNumMapBeforeRemapping;
}
private void freshState() {
......@@ -58,7 +61,27 @@ public class TopicQueueMappingUtils {
leastBrokers.add(entry.getKey());
}
}
currentIndex = random.nextInt(leastBrokers.size());
//reduce the remapping
if (brokerNumMapBeforeRemapping != null
&& !brokerNumMapBeforeRemapping.isEmpty()) {
Collections.sort(leastBrokers, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
int i1 = 0, i2 = 0;
if (brokerNumMapBeforeRemapping.containsKey(o1)) {
i1 = brokerNumMapBeforeRemapping.get(o1);
}
if (brokerNumMapBeforeRemapping.containsKey(o2)) {
i2 = brokerNumMapBeforeRemapping.get(o2);
}
return i1 - i2;
}
});
} else {
//reduce the imbalance
Collections.shuffle(leastBrokers);
}
currentIndex = leastBrokers.size() - 1;
}
private String nextBroker() {
if (leastBrokers.isEmpty()) {
......@@ -93,8 +116,9 @@ public class TopicQueueMappingUtils {
}
}
public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
return new MappingAllocator(idToBroker, brokerNumMap);
public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
return new MappingAllocator(idToBroker, brokerNumMap, brokerNumMapBeforeRemapping);
}
public static Map.Entry<Long, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
......@@ -367,16 +391,28 @@ public class TopicQueueMappingUtils {
}
}
public static void checkIfTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
public static void checkTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
for (String broker : brokerConfigMap.keySet()) {
if (brokerConfigMap.get(broker).getMappingDetail().getHostedQueues().isEmpty()) {
continue;
}
if (!targetBrokers.contains(broker)) {
throw new RuntimeException("The existed broker " + broker + " dose not in target brokers ");
}
}
}
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
public static void checkNonTargetBrokers(Set<String> targetBrokers, Set<String> nonTargetBrokers) {
for (String broker : nonTargetBrokers) {
if (targetBrokers.contains(broker)) {
throw new RuntimeException("The non-target broker exist in target broker");
}
}
}
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Set<String> nonTargetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
checkNonTargetBrokers(targetBrokers, nonTargetBrokers);
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
......@@ -408,7 +444,7 @@ public class TopicQueueMappingUtils {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap, null);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
......@@ -436,6 +472,12 @@ public class TopicQueueMappingUtils {
TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
}
//set the non target brokers
for (String broker : nonTargetBrokers) {
if (!brokerConfigMap.containsKey(broker)) {
brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
}
}
// set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
......@@ -466,10 +508,20 @@ public class TopicQueueMappingUtils {
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
Map<String, Integer> brokerNumMapBeforeRemapping = new HashMap<String, Integer>();
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1);
} else {
brokerNumMapBeforeRemapping.put(mappingOne.bname, 1);
}
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, brokerNumMapBeforeRemapping);
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
//cannot directly use the idBrokerMap from allocator, for the number of globalId maybe not in the natural order
Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
......
......@@ -10,6 +10,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
public class TopicMappingUtilsTest {
......@@ -35,6 +36,18 @@ public class TopicMappingUtilsTest {
return map;
}
private Map<String, Integer> buildBrokerNumMap(int num, int queues) {
Map<String, Integer> map = new HashMap<String, Integer>();
int random = new Random().nextInt(num);
for (int i = 0; i < num; i++) {
map.put("broker" + i, queues);
if (i == random) {
map.put("broker" + i, queues + 1);
}
}
return map;
}
private void testIdToBroker(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
Map<String, Integer> brokerNumOther = new HashMap<String, Integer>();
for (int i = 0; i < idToBroker.size(); i++) {
......@@ -58,7 +71,7 @@ public class TopicMappingUtilsTest {
for (int i = 0; i < 10; i++) {
int num = 3;
Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, null);
allocator.upToNum(num * 2);
for (Map.Entry<String, Integer> entry: allocator.getBrokerNumMap().entrySet()) {
Assert.assertEquals(2L, entry.getValue().longValue());
......@@ -77,6 +90,18 @@ public class TopicMappingUtilsTest {
}
}
@Test
public void testRemappingAllocator() {
for (int i = 0; i < 10; i++) {
int num = (i + 2) * 2;
Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
Map<String, Integer> brokerNumMapBeforeRemapping = buildBrokerNumMap(num, num);
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, brokerNumMapBeforeRemapping);
allocator.upToNum(num * num + 1);
Assert.assertEquals(brokerNumMapBeforeRemapping, allocator.getBrokerNumMap());
}
}
@Test(expected = RuntimeException.class)
public void testTargetBrokersComplete() {
......@@ -86,8 +111,10 @@ public class TopicMappingUtilsTest {
Set<String> targetBrokers = new HashSet<String>();
targetBrokers.add(broker1);
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0)));
TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker2, 0);
mappingDetail.getHostedQueues().put(1, new ArrayList<LogicQueueMappingItem>());
brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), mappingDetail));
TopicQueueMappingUtils.checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
}
......@@ -99,28 +126,36 @@ public class TopicMappingUtilsTest {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
for (int i = 1; i < 10; i++) {
Set<String> targetBrokers = buildTargetBrokers(2 * i);
Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test");
queueNum = 10 * i;
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2 * i, brokerConfigMap.size());
Assert.assertEquals(4 * i, brokerConfigMap.size());
//do the check manually
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
Assert.assertEquals(queueNum, maxEpochAndNum.getValue().longValue());
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
Assert.assertEquals(5, configMapping.getReadQueueNums());
Assert.assertEquals(5, configMapping.getWriteQueueNums());
Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
for (LogicQueueMappingItem item: items) {
Assert.assertEquals(0, item.getStartOffset());
Assert.assertEquals(0, item.getLogicOffset());
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
if (nonTargetBrokers.contains(configMapping.getMappingDetail().bname)) {
Assert.assertEquals(0, configMapping.getReadQueueNums());
Assert.assertEquals(0, configMapping.getWriteQueueNums());
Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
} else {
Assert.assertEquals(5, configMapping.getReadQueueNums());
Assert.assertEquals(5, configMapping.getWriteQueueNums());
Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
for (LogicQueueMappingItem item: items) {
Assert.assertEquals(0, item.getStartOffset());
Assert.assertEquals(0, item.getLogicOffset());
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
}
}
}
}
......@@ -133,7 +168,7 @@ public class TopicMappingUtilsTest {
int queueNum = 7;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
......@@ -171,6 +206,27 @@ public class TopicMappingUtilsTest {
}
}
@Test
public void testRemappingStaticTopicStability() {
String topic = "static";
int queueNum = 7;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2);
{
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
}
for (int i = 0; i < 10; i++) {
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, originalBrokers);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
Assert.assertTrue(wrapper.getBrokerToMapIn().isEmpty());
Assert.assertTrue(wrapper.getBrokerToMapOut().isEmpty());
}
}
@Test
public void testUtilsCheck() {
......@@ -178,7 +234,7 @@ public class TopicMappingUtilsTest {
int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> targetBrokers = buildTargetBrokers(2);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<String>(), brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
......
......@@ -16,6 +16,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
......@@ -45,7 +46,7 @@ public class StaticTopicIT extends BaseConf {
public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty());
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
......
......@@ -217,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//calculate the new data
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
{
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册