diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java new file mode 100644 index 0000000000000000000000000000000000000000..9b166e7a96c578e461aa870cddfd35fe6e171719 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +/** + * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be + * specified. + * + * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine + * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are + * no alive consumer to monopolize them. + */ +public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy { + private final Logger log = ClientLogger.getLog(); + + private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy + private final MachineRoomResolver machineRoomResolver; + + public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy, + MachineRoomResolver machineRoomResolver) throws NullPointerException { + if (allocateMessageQueueStrategy == null) { + throw new NullPointerException("allocateMessageQueueStrategy is null"); + } + + if (machineRoomResolver == null) { + throw new NullPointerException("machineRoomResolver is null"); + } + + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + this.machineRoomResolver = machineRoomResolver; + } + + @Override + public List allocate(String consumerGroup, String currentCID, List mqAll, + List cidAll) { + if (currentCID == null || currentCID.length() < 1) { + throw new IllegalArgumentException("currentCID is empty"); + } + if (mqAll == null || mqAll.isEmpty()) { + throw new IllegalArgumentException("mqAll is null or mqAll empty"); + } + if (cidAll == null || cidAll.isEmpty()) { + throw new IllegalArgumentException("cidAll is null or cidAll empty"); + } + + List result = new ArrayList(); + if (!cidAll.contains(currentCID)) { + log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", + consumerGroup, + currentCID, + cidAll); + return result; + } + + //group mq by machine room + Map> mr2Mq = new TreeMap>(); + for (MessageQueue mq : mqAll) { + String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq); + if (StringUtils.isNoneEmpty(brokerMachineRoom)) { + if (mr2Mq.get(brokerMachineRoom) == null) { + mr2Mq.put(brokerMachineRoom, new ArrayList()); + } + mr2Mq.get(brokerMachineRoom).add(mq); + } else { + throw new IllegalArgumentException("Machine room is null for mq " + mq); + } + } + + //group consumer by machine room + Map> mr2c = new TreeMap>(); + for (String cid : cidAll) { + String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid); + if (StringUtils.isNoneEmpty(consumerMachineRoom)) { + if (mr2c.get(consumerMachineRoom) == null) { + mr2c.put(consumerMachineRoom, new ArrayList()); + } + mr2c.get(consumerMachineRoom).add(cid); + } else { + throw new IllegalArgumentException("Machine room is null for consumer id " + cid); + } + } + + List allocateResults = new ArrayList(); + + //1.allocate the mq that deploy in the same machine room with the current consumer + String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID); + List mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom); + List consumerInThisMachineRoom = mr2c.get(currentMachineRoom); + if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) { + allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom)); + } + + //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room + for (String machineRoom : mr2Mq.keySet()) { + if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues + allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll)); + } + } + + return allocateResults; + } + + @Override + public String getName() { + return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName(); + } + + /** + * A resolver object to determine which machine room do the message queues or clients are deployed in. + * + * AllocateMachineRoomNearby will use the results to group the message queues and clients by machine room. + * + * The result returned from the implemented method CANNOT be null. + */ + public interface MachineRoomResolver { + String brokerDeployIn(MessageQueue messageQueue); + + String consumerDeployIn(String clientID); + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java new file mode 100644 index 0000000000000000000000000000000000000000..0d394c382311fb2140aaa655443677d5b6a5902c --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class AllocateMachineRoomNearByTest { + + private static final String CID_PREFIX = "CID-"; + + private final String topic = "topic_test"; + private final AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() { + @Override public String brokerDeployIn(MessageQueue messageQueue) { + return messageQueue.getBrokerName().split("-")[0]; + } + + @Override public String consumerDeployIn(String clientID) { + return clientID.split("-")[0]; + } + }; + private final AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver); + + + @Before + public void init() { + } + + + @Test + public void test1() { + testWhenIDCSizeEquals(5,20,10, false); + testWhenIDCSizeEquals(5,20,20, false); + testWhenIDCSizeEquals(5,20,30, false); + testWhenIDCSizeEquals(5,20,0, false ); + } + + @Test + public void test2() { + testWhenConsumerIDCIsMore(5,1,10, 10, false); + testWhenConsumerIDCIsMore(5,1,10, 5, false); + testWhenConsumerIDCIsMore(5,1,10, 20, false); + testWhenConsumerIDCIsMore(5,1,10, 0, false); + } + + @Test + public void test3() { + testWhenConsumerIDCIsLess(5,2,10, 10, false); + testWhenConsumerIDCIsLess(5,2,10, 5, false); + testWhenConsumerIDCIsLess(5,2,10, 20, false); + testWhenConsumerIDCIsLess(5,2,10, 0, false); + } + + + @Test + public void testRun10RandomCase(){ + for(int i=0;i<10;i++){ + int consumerSize = new Random().nextInt(200)+1;//1-200 + int queueSize = new Random().nextInt(100)+1;//1-100 + int brokerIDCSize = new Random().nextInt(10)+1;//1-10 + int consumerIDCSize = new Random().nextInt(10)+1;//1-10 + + if (brokerIDCSize == consumerIDCSize) { + testWhenIDCSizeEquals(brokerIDCSize,queueSize,consumerSize,false); + } + else if (brokerIDCSize > consumerIDCSize) { + testWhenConsumerIDCIsLess(brokerIDCSize,brokerIDCSize- consumerIDCSize, queueSize, consumerSize, false); + } else { + testWhenConsumerIDCIsMore(brokerIDCSize, consumerIDCSize - brokerIDCSize, queueSize, consumerSize, false); + } + } + } + + + + + public void testWhenIDCSizeEquals(int IDCSize, int queueSize, int consumerSize, boolean print) { + if (print) { + System.out.println("Test : IDCSize = "+ IDCSize +"queueSize = " + queueSize +" consumerSize = " + consumerSize); + } + List cidAll = prepareConsumer(IDCSize, consumerSize); + List mqAll = prepareMQ(IDCSize, queueSize); + List resAll = new ArrayList(); + for (String currentID : cidAll) { + List res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll); + if (print) { + System.out.println("cid: "+currentID+"--> res :" +res); + } + for (MessageQueue mq : res) { + Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID))); + } + resAll.addAll(res); + } + Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll)); + + if (print) { + System.out.println("-------------------------------------------------------------------"); + } + } + + public void testWhenConsumerIDCIsMore(int brokerIDCSize, int consumerMore, int queueSize, int consumerSize, boolean print) { + if (print) { + System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize); + } + Set brokerIDCWithConsumer = new TreeSet(); + List cidAll = prepareConsumer(brokerIDCSize +consumerMore, consumerSize); + List mqAll = prepareMQ(brokerIDCSize, queueSize); + for (MessageQueue mq : mqAll) { + brokerIDCWithConsumer.add(machineRoomResolver.brokerDeployIn(mq)); + } + + List resAll = new ArrayList(); + for (String currentID : cidAll) { + List res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll); + if (print) { + System.out.println("cid: "+currentID+"--> res :" +res); + } + for (MessageQueue mq : res) { + if (brokerIDCWithConsumer.contains(machineRoomResolver.brokerDeployIn(mq))) {//healthy idc, so only consumer in this idc should be allocated + Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID))); + } + } + resAll.addAll(res); + } + + Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll)); + if (print) { + System.out.println("-------------------------------------------------------------------"); + } + } + + public void testWhenConsumerIDCIsLess(int brokerIDCSize, int consumerIDCLess, int queueSize, int consumerSize, boolean print) { + if (print) { + System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize); + } + Set healthyIDC = new TreeSet(); + List cidAll = prepareConsumer(brokerIDCSize - consumerIDCLess, consumerSize); + List mqAll = prepareMQ(brokerIDCSize, queueSize); + for (String cid : cidAll) { + healthyIDC.add(machineRoomResolver.consumerDeployIn(cid)); + } + + List resAll = new ArrayList(); + Map> idc2Res = new TreeMap>(); + for (String currentID : cidAll) { + String currentIDC = machineRoomResolver.consumerDeployIn(currentID); + List res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll); + if (print) { + System.out.println("cid: "+currentID+"--> res :" +res); + } + if ( !idc2Res.containsKey(currentIDC)) { + idc2Res.put(currentIDC, new ArrayList()); + } + idc2Res.get(currentIDC).addAll(res); + resAll.addAll(res); + } + + for (String consumerIDC : healthyIDC) { + List resInOneIDC = idc2Res.get(consumerIDC); + List mqInThisIDC = createMessageQueueList(consumerIDC,queueSize); + Assert.assertTrue(resInOneIDC.containsAll(mqInThisIDC)); + } + + Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll)); + if (print) { + System.out.println("-------------------------------------------------------------------"); + } + } + + + private boolean hasAllocateAllQ(List cidAll,List mqAll, List allocatedResAll) { + if (cidAll.isEmpty()){ + return allocatedResAll.isEmpty(); + } + return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll) && mqAll.size() == allocatedResAll.size(); + } + + + private List createConsumerIdList(String machineRoom, int size) { + List consumerIdList = new ArrayList(size); + for (int i = 0; i < size; i++) { + consumerIdList.add(machineRoom +"-"+CID_PREFIX + String.valueOf(i)); + } + return consumerIdList; + } + + private List createMessageQueueList(String machineRoom, int size) { + List messageQueueList = new ArrayList(size); + for (int i = 0; i < size; i++) { + MessageQueue mq = new MessageQueue(topic, machineRoom+"-brokerName", i); + messageQueueList.add(mq); + } + return messageQueueList; + } + + private List prepareMQ(int brokerIDCSize, int queueSize) { + List mqAll = new ArrayList(); + for (int i=1;i<=brokerIDCSize;i++) { + mqAll.addAll(createMessageQueueList("IDC"+i, queueSize)); + } + + return mqAll; + } + + private List prepareConsumer( int IDCSize, int consumerSize) { + List cidAll = new ArrayList(); + for (int i=1;i<=IDCSize;i++) { + cidAll.addAll(createConsumerIdList("IDC"+i, consumerSize)); + } + return cidAll; + } +}