diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java new file mode 100644 index 0000000000000000000000000000000000000000..77198b7fdc83b7c485da01e223e49d4cad012c0e --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter; +import org.apache.rocketmq.common.consistenthash.HashFunction; +import org.apache.rocketmq.common.consistenthash.Node; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +/** + * Consistent Hashing queue algorithm + */ +public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy { + private final Logger log = ClientLogger.getLog(); + + private final int virtualNodeCnt; + private final HashFunction customHashFunction; + + public AllocateMessageQueueConsistentHash() { + this(10); + } + + public AllocateMessageQueueConsistentHash(int virtualNodeCnt) { + this(virtualNodeCnt,null); + } + + public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) { + if (virtualNodeCnt < 0) { + throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt); + } + this.virtualNodeCnt = virtualNodeCnt; + this.customHashFunction = customHashFunction; + } + + @Override + public List allocate(String consumerGroup, String currentCID, List mqAll, + List cidAll) { + + if (currentCID == null || currentCID.length() < 1) { + throw new IllegalArgumentException("currentCID is empty"); + } + if (mqAll == null || mqAll.isEmpty()) { + throw new IllegalArgumentException("mqAll is null or mqAll empty"); + } + if (cidAll == null || cidAll.isEmpty()) { + throw new IllegalArgumentException("cidAll is null or cidAll empty"); + } + + List result = new ArrayList(); + if (!cidAll.contains(currentCID)) { + log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", + consumerGroup, + currentCID, + cidAll); + return result; + } + + + Collection cidNodes = new ArrayList<>(); + for (String cid : cidAll) { + cidNodes.add(new ClientNode(cid)); + } + + final ConsistentHashRouter router; //for building hash ring + if (customHashFunction != null) { + router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction); + } else { + router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt); + } + + List results = new ArrayList<>(); + for (MessageQueue mq : mqAll) { + ClientNode clientNode = router.routeNode(mq.toString()); + if (clientNode != null && currentCID.equals(clientNode.getKey())) { + results.add(mq); + } + } + + return results; + + } + + @Override + public String getName() { + return "CONSISTENT_HASH"; + } + + + private static class ClientNode implements Node { + private final String clientID; + + public ClientNode(String clientID) { + this.clientID = clientID; + } + + @Override + public String getKey() { + return clientID; + } + } + + + +} diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java new file mode 100644 index 0000000000000000000000000000000000000000..fc7ab9fcf4d154399a576aed890bd1e4a435e66b --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class AllocateMessageQueueConsitentHashTest { + + private String topic; + private static final String CID_PREFIX = "CID-"; + + @Before + public void init() { + topic = "topic_test"; + } + + + + public void printMessageQueue(List messageQueueList, String name) { + if (messageQueueList == null || messageQueueList.size() < 1) + return; + System.out.println(name + ".......................................start"); + for (MessageQueue messageQueue : messageQueueList) { + System.out.println(messageQueue); + } + System.out.println(name + ".......................................end"); + } + + @Test + public void testCurrentCIDNotExists() { + String currentCID = String.valueOf(Integer.MAX_VALUE); + List consumerIdList = createConsumerIdList(2); + List messageQueueList = createMessageQueueList(6); + List result = new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testCurrentCIDNotExists"); + Assert.assertEquals(result.size(), 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testCurrentCIDIllegalArgument() { + List consumerIdList = createConsumerIdList(2); + List messageQueueList = createMessageQueueList(6); + new AllocateMessageQueueConsistentHash().allocate("", "", messageQueueList, consumerIdList); + } + + @Test(expected = IllegalArgumentException.class) + public void testMessageQueueIllegalArgument() { + String currentCID = "0"; + List consumerIdList = createConsumerIdList(2); + new AllocateMessageQueueConsistentHash().allocate("", currentCID, null, consumerIdList); + } + + @Test(expected = IllegalArgumentException.class) + public void testConsumerIdIllegalArgument() { + String currentCID = "0"; + List messageQueueList = createMessageQueueList(6); + new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, null); + } + + @Test + public void testAllocate1() { + testAllocate(20,10); + } + + @Test + public void testAllocate2() { + testAllocate(10,20); + } + + + @Test + public void testRun100RandomCase(){ + for(int i=0;i<100;i++){ + int consumerSize = new Random().nextInt(200)+1;//1-200 + int queueSize = new Random().nextInt(100)+1;//1-100 + testAllocate(queueSize,consumerSize); + try { + Thread.sleep(1); + } catch (InterruptedException e) {} + } + } + + + public void testAllocate(int queueSize, int consumerSize) { + AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3); + + List mqAll = createMessageQueueList(queueSize); + //System.out.println("mqAll:" + mqAll.toString()); + + List cidAll = createConsumerIdList(consumerSize); + List allocatedResAll = new ArrayList<>(); + + Map allocateToAllOrigin = new TreeMap<>(); + //test allocate all + { + + List cidBegin = new ArrayList<>(cidAll); + + //System.out.println("cidAll:" + cidBegin.toString()); + for (String cid : cidBegin) { + List rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidBegin); + for (MessageQueue mq : rs) { + allocateToAllOrigin.put(mq, cid); + } + allocatedResAll.addAll(rs); + //System.out.println("rs[" + cid + "]:" + rs.toString()); + } + + Assert.assertTrue( + verifyAllocateAll(cidBegin,mqAll, allocatedResAll)); + } + + Map allocateToAllAfterRemoveOne = new TreeMap<>(); + List cidAfterRemoveOne = new ArrayList<>(cidAll); + //test allocate remove one cid + { + String removeCID = cidAfterRemoveOne.remove(0); + //System.out.println("removing one cid "+removeCID); + List mqShouldOnlyChanged = new ArrayList<>(); + Iterator> it = allocateToAllOrigin.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (entry.getValue().equals(removeCID)) { + mqShouldOnlyChanged.add(entry.getKey()); + } + } + + //System.out.println("cidAll:" + cidAfterRemoveOne.toString()); + List allocatedResAllAfterRemove = new ArrayList<>(); + for (String cid : cidAfterRemoveOne) { + List rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne); + allocatedResAllAfterRemove.addAll(rs); + for (MessageQueue mq : rs) { + allocateToAllAfterRemoveOne.put(mq, cid); + } + //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString()); + } + + Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove, + verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove)); + verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID); + } + + List cidAfterAdd = new ArrayList<>(cidAfterRemoveOne); + //test allocate add one more cid + { + String newCid = CID_PREFIX+"NEW"; + //System.out.println("add one more cid "+newCid); + cidAfterAdd.add(newCid); + List mqShouldOnlyChanged = new ArrayList<>(); + //System.out.println("cidAll:" + cidAfterAdd.toString()); + List allocatedResAllAfterAdd = new ArrayList<>(); + Map allocateToAll3 = new TreeMap<>(); + for (String cid : cidAfterAdd) { + List rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd); + allocatedResAllAfterAdd.addAll(rs); + for (MessageQueue mq : rs) { + allocateToAll3.put(mq, cid); + if (cid.equals(newCid)){ + mqShouldOnlyChanged.add(mq); + } + } + //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString()); + } + + Assert.assertTrue( + verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd)); + verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid); + } + } + + private boolean verifyAllocateAll(List cidAll,List mqAll, List allocatedResAll) { + if (cidAll.isEmpty()){ + return allocatedResAll.isEmpty(); + } + return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll); + } + + private void verifyAfterRemove(Map allocateToBefore, Map allocateAfter, String removeCID) { + for (MessageQueue mq : allocateToBefore.keySet()) { + String allocateToOrigin = allocateToBefore.get(mq); + if (allocateToOrigin.equals(removeCID)) { + + } else {//the rest queue should be the same + Assert.assertTrue(allocateAfter.get(mq).equals(allocateToOrigin));//should be the same + } + } + } + + private void verifyAfterAdd(Map allocateBefore, Map allocateAfter, String newCID) { + for (MessageQueue mq : allocateAfter.keySet()) { + String allocateToOrigin = allocateBefore.get(mq); + String allocateToAfter = allocateAfter.get(mq); + if (allocateToAfter.equals(newCID)) { + + } else {//the rest queue should be the same + Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same + } + } + } + + private List createConsumerIdList(int size) { + List consumerIdList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + consumerIdList.add(CID_PREFIX + String.valueOf(i)); + } + return consumerIdList; + } + + private List createMessageQueueList(int size) { + List messageQueueList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + messageQueueList.add(mq); + } + return messageQueueList; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java new file mode 100644 index 0000000000000000000000000000000000000000..8606c4330e130c670f51b2fcd1351510eed793ce --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.consistenthash; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Iterator; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * To hash Node objects to a hash ring with a certain amount of virtual node. + * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm + * + * @param + */ +public class ConsistentHashRouter { + private final SortedMap> ring = new TreeMap<>(); + private final HashFunction hashFunction; + + public ConsistentHashRouter(Collection pNodes, int vNodeCount) { + this(pNodes,vNodeCount, new MD5Hash()); + } + + /** + * + * @param pNodes collections of physical nodes + * @param vNodeCount amounts of virtual nodes + * @param hashFunction hash Function to hash Node instances + */ + public ConsistentHashRouter(Collection pNodes, int vNodeCount, HashFunction hashFunction) { + if (hashFunction == null) { + throw new NullPointerException("Hash Function is null"); + } + this.hashFunction = hashFunction; + if (pNodes != null) { + for (T pNode : pNodes) { + addNode(pNode, vNodeCount); + } + } + } + + /** + * add physic node to the hash ring with some virtual nodes + * @param pNode physical node needs added to hash ring + * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0 + */ + public void addNode(T pNode, int vNodeCount) { + if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount); + int existingReplicas = getExistingReplicas(pNode); + for (int i = 0; i < vNodeCount; i++) { + VirtualNode vNode = new VirtualNode<>(pNode, i + existingReplicas); + ring.put(hashFunction.hash(vNode.getKey()), vNode); + } + } + + /** + * remove the physical node from the hash ring + * @param pNode + */ + public void removeNode(T pNode) { + Iterator it = ring.keySet().iterator(); + while (it.hasNext()) { + Long key = it.next(); + VirtualNode virtualNode = ring.get(key); + if (virtualNode.isVirtualNodeOf(pNode)) { + it.remove(); + } + } + } + + /** + * with a specified key, route the nearest Node instance in the current hash ring + * @param objectKey the object key to find a nearest Node + * @return + */ + public T routeNode(String objectKey) { + if (ring.isEmpty()) { + return null; + } + Long hashVal = hashFunction.hash(objectKey); + SortedMap> tailMap = ring.tailMap(hashVal); + Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey(); + return ring.get(nodeHashVal).getPhysicalNode(); + } + + + public int getExistingReplicas(T pNode) { + int replicas = 0; + for (VirtualNode vNode : ring.values()) { + if (vNode.isVirtualNodeOf(pNode)) { + replicas++; + } + } + return replicas; + } + + + //default hash function + private static class MD5Hash implements HashFunction { + MessageDigest instance; + + public MD5Hash() { + try { + instance = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + } + } + + @Override + public long hash(String key) { + instance.reset(); + instance.update(key.getBytes()); + byte[] digest = instance.digest(); + + long h = 0; + for (int i = 0; i < 4; i++) { + h <<= 8; + h |= ((int) digest[i]) & 0xFF; + } + return h; + } + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..58fd777cad6ef332e4567a3a65e2255d6bc997cf --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.consistenthash; + +/** + * Hash String to long value + */ +public interface HashFunction { + long hash(String key); +} diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java new file mode 100644 index 0000000000000000000000000000000000000000..0ece21098a36eae1e01435bbdc9c2e780346bf1e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.consistenthash; + +/** + * Represent a node which should be mapped to a hash ring + */ +public interface Node { + /** + * + * @return the key which will be used for hash mapping + */ + String getKey(); +} diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java new file mode 100644 index 0000000000000000000000000000000000000000..c8b72d9018b5201c9cd792f1bdd0f6b8d074a540 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.consistenthash; + + +public class VirtualNode implements Node { + final T physicalNode; + final int replicaIndex; + + public VirtualNode(T physicalNode, int replicaIndex) { + this.replicaIndex = replicaIndex; + this.physicalNode = physicalNode; + } + + @Override + public String getKey() { + return physicalNode.getKey() + "-" + replicaIndex; + } + + public boolean isVirtualNodeOf(T pNode) { + return physicalNode.getKey().equals(pNode.getKey()); + } + + public T getPhysicalNode() { + return physicalNode; + } +}