提交 787d1286 编写于 作者: J Jaskey 提交者: dongeforever

[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67

上级 3292d039
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
/**
* Consistent Hashing queue algorithm
*/
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
public AllocateMessageQueueConsistentHash() {
this(10);
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
this(virtualNodeCnt,null);
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
if (virtualNodeCnt < 0) {
throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
}
this.virtualNodeCnt = virtualNodeCnt;
this.customHashFunction = customHashFunction;
}
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
Collection<ClientNode> cidNodes = new ArrayList<>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
}
@Override
public String getName() {
return "CONSISTENT_HASH";
}
private static class ClientNode implements Node {
private final String clientID;
public ClientNode(String clientID) {
this.clientID = clientID;
}
@Override
public String getKey() {
return clientID;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AllocateMessageQueueConsitentHashTest {
private String topic;
private static final String CID_PREFIX = "CID-";
@Before
public void init() {
topic = "topic_test";
}
public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
if (messageQueueList == null || messageQueueList.size() < 1)
return;
System.out.println(name + ".......................................start");
for (MessageQueue messageQueue : messageQueueList) {
System.out.println(messageQueue);
}
System.out.println(name + ".......................................end");
}
@Test
public void testCurrentCIDNotExists() {
String currentCID = String.valueOf(Integer.MAX_VALUE);
List<String> consumerIdList = createConsumerIdList(2);
List<MessageQueue> messageQueueList = createMessageQueueList(6);
List<MessageQueue> result = new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testCurrentCIDNotExists");
Assert.assertEquals(result.size(), 0);
}
@Test(expected = IllegalArgumentException.class)
public void testCurrentCIDIllegalArgument() {
List<String> consumerIdList = createConsumerIdList(2);
List<MessageQueue> messageQueueList = createMessageQueueList(6);
new AllocateMessageQueueConsistentHash().allocate("", "", messageQueueList, consumerIdList);
}
@Test(expected = IllegalArgumentException.class)
public void testMessageQueueIllegalArgument() {
String currentCID = "0";
List<String> consumerIdList = createConsumerIdList(2);
new AllocateMessageQueueConsistentHash().allocate("", currentCID, null, consumerIdList);
}
@Test(expected = IllegalArgumentException.class)
public void testConsumerIdIllegalArgument() {
String currentCID = "0";
List<MessageQueue> messageQueueList = createMessageQueueList(6);
new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, null);
}
@Test
public void testAllocate1() {
testAllocate(20,10);
}
@Test
public void testAllocate2() {
testAllocate(10,20);
}
@Test
public void testRun100RandomCase(){
for(int i=0;i<100;i++){
int consumerSize = new Random().nextInt(200)+1;//1-200
int queueSize = new Random().nextInt(100)+1;//1-100
testAllocate(queueSize,consumerSize);
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
}
}
public void testAllocate(int queueSize, int consumerSize) {
AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3);
List<MessageQueue> mqAll = createMessageQueueList(queueSize);
//System.out.println("mqAll:" + mqAll.toString());
List<String> cidAll = createConsumerIdList(consumerSize);
List<MessageQueue> allocatedResAll = new ArrayList<>();
Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
//test allocate all
{
List<String> cidBegin = new ArrayList<>(cidAll);
//System.out.println("cidAll:" + cidBegin.toString());
for (String cid : cidBegin) {
List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidBegin);
for (MessageQueue mq : rs) {
allocateToAllOrigin.put(mq, cid);
}
allocatedResAll.addAll(rs);
//System.out.println("rs[" + cid + "]:" + rs.toString());
}
Assert.assertTrue(
verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
}
Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
//test allocate remove one cid
{
String removeCID = cidAfterRemoveOne.remove(0);
//System.out.println("removing one cid "+removeCID);
List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, String> entry = it.next();
if (entry.getValue().equals(removeCID)) {
mqShouldOnlyChanged.add(entry.getKey());
}
}
//System.out.println("cidAll:" + cidAfterRemoveOne.toString());
List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
for (String cid : cidAfterRemoveOne) {
List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
allocatedResAllAfterRemove.addAll(rs);
for (MessageQueue mq : rs) {
allocateToAllAfterRemoveOne.put(mq, cid);
}
//System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
}
Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove,
verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove));
verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
}
List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
//test allocate add one more cid
{
String newCid = CID_PREFIX+"NEW";
//System.out.println("add one more cid "+newCid);
cidAfterAdd.add(newCid);
List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
//System.out.println("cidAll:" + cidAfterAdd.toString());
List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
for (String cid : cidAfterAdd) {
List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
allocatedResAllAfterAdd.addAll(rs);
for (MessageQueue mq : rs) {
allocateToAll3.put(mq, cid);
if (cid.equals(newCid)){
mqShouldOnlyChanged.add(mq);
}
}
//System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
}
Assert.assertTrue(
verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd));
verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid);
}
}
private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
if (cidAll.isEmpty()){
return allocatedResAll.isEmpty();
}
return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
}
private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) {
for (MessageQueue mq : allocateToBefore.keySet()) {
String allocateToOrigin = allocateToBefore.get(mq);
if (allocateToOrigin.equals(removeCID)) {
} else {//the rest queue should be the same
Assert.assertTrue(allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
}
}
}
private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) {
for (MessageQueue mq : allocateAfter.keySet()) {
String allocateToOrigin = allocateBefore.get(mq);
String allocateToAfter = allocateAfter.get(mq);
if (allocateToAfter.equals(newCID)) {
} else {//the rest queue should be the same
Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
}
}
}
private List<String> createConsumerIdList(int size) {
List<String> consumerIdList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
consumerIdList.add(CID_PREFIX + String.valueOf(i));
}
return consumerIdList;
}
private List<MessageQueue> createMessageQueueList(int size) {
List<MessageQueue> messageQueueList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
MessageQueue mq = new MessageQueue(topic, "brokerName", i);
messageQueueList.add(mq);
}
return messageQueueList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.consistenthash;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.Iterator;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* To hash Node objects to a hash ring with a certain amount of virtual node.
* Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
*
* @param <T>
*/
public class ConsistentHashRouter<T extends Node> {
private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
private final HashFunction hashFunction;
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
this(pNodes,vNodeCount, new MD5Hash());
}
/**
*
* @param pNodes collections of physical nodes
* @param vNodeCount amounts of virtual nodes
* @param hashFunction hash Function to hash Node instances
*/
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
if (hashFunction == null) {
throw new NullPointerException("Hash Function is null");
}
this.hashFunction = hashFunction;
if (pNodes != null) {
for (T pNode : pNodes) {
addNode(pNode, vNodeCount);
}
}
}
/**
* add physic node to the hash ring with some virtual nodes
* @param pNode physical node needs added to hash ring
* @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
*/
public void addNode(T pNode, int vNodeCount) {
if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
int existingReplicas = getExistingReplicas(pNode);
for (int i = 0; i < vNodeCount; i++) {
VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
ring.put(hashFunction.hash(vNode.getKey()), vNode);
}
}
/**
* remove the physical node from the hash ring
* @param pNode
*/
public void removeNode(T pNode) {
Iterator<Long> it = ring.keySet().iterator();
while (it.hasNext()) {
Long key = it.next();
VirtualNode<T> virtualNode = ring.get(key);
if (virtualNode.isVirtualNodeOf(pNode)) {
it.remove();
}
}
}
/**
* with a specified key, route the nearest Node instance in the current hash ring
* @param objectKey the object key to find a nearest Node
* @return
*/
public T routeNode(String objectKey) {
if (ring.isEmpty()) {
return null;
}
Long hashVal = hashFunction.hash(objectKey);
SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
return ring.get(nodeHashVal).getPhysicalNode();
}
public int getExistingReplicas(T pNode) {
int replicas = 0;
for (VirtualNode<T> vNode : ring.values()) {
if (vNode.isVirtualNodeOf(pNode)) {
replicas++;
}
}
return replicas;
}
//default hash function
private static class MD5Hash implements HashFunction {
MessageDigest instance;
public MD5Hash() {
try {
instance = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
}
}
@Override
public long hash(String key) {
instance.reset();
instance.update(key.getBytes());
byte[] digest = instance.digest();
long h = 0;
for (int i = 0; i < 4; i++) {
h <<= 8;
h |= ((int) digest[i]) & 0xFF;
}
return h;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.consistenthash;
/**
* Hash String to long value
*/
public interface HashFunction {
long hash(String key);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.consistenthash;
/**
* Represent a node which should be mapped to a hash ring
*/
public interface Node {
/**
*
* @return the key which will be used for hash mapping
*/
String getKey();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.consistenthash;
public class VirtualNode<T extends Node> implements Node {
final T physicalNode;
final int replicaIndex;
public VirtualNode(T physicalNode, int replicaIndex) {
this.replicaIndex = replicaIndex;
this.physicalNode = physicalNode;
}
@Override
public String getKey() {
return physicalNode.getKey() + "-" + replicaIndex;
}
public boolean isVirtualNodeOf(T pNode) {
return physicalNode.getKey().equals(pNode.getKey());
}
public T getPhysicalNode() {
return physicalNode;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册