未验证 提交 c60cb0fc 编写于 作者: J Jack Tsai 提交者: GitHub

[ISSUE #2149] Apache RocketMQ rebalancing architecture optimization (#2169)

上级 fac30c35
......@@ -73,6 +73,7 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
......@@ -143,6 +144,8 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ConcurrentMap<String /* Consumer Group */, AllocateMessageQueueStrategy /* Strategy Object */> allocateMessageQueueStrategyTable
= new ConcurrentHashMap<String, AllocateMessageQueueStrategy>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
......@@ -605,10 +608,12 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
......@@ -741,6 +746,10 @@ public class BrokerController {
return subscriptionGroupManager;
}
public ConcurrentMap<String, AllocateMessageQueueStrategy> getAllocateMessageQueueStrategyTable() {
return allocateMessageQueueStrategyTable;
}
public void shutdown() {
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
......
......@@ -18,12 +18,18 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
......@@ -31,6 +37,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -57,6 +71,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
case RequestCode.ALLOCATE_MESSAGE_QUEUE:
return this.allocateMessageQueue(ctx, request);
default:
break;
}
......@@ -152,4 +168,73 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return response;
}
private synchronized RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
final AllocateMessageQueueRequestHeader requestHeader =
(AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(),
AllocateMessageQueueRequestBody.class);
AllocateMessageQueueStrategy strategy = null;
String consumerGroup = requestHeader.getConsumerGroup();
String strategyName = requestHeader.getStrategyName();
Map<String, AllocateMessageQueueStrategy> strategyTable = this.brokerController.getAllocateMessageQueueStrategyTable();
if (strategyTable.containsKey(consumerGroup) && strategyName.equals(strategyTable.get(consumerGroup).getName())) {
strategy = strategyTable.get(consumerGroup);
} else {
switch (strategyName) {
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
strategy = new AllocateMessageQueueAveragely();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
strategy = new AllocateMessageQueueAveragelyByCircle();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
strategy = new AllocateMessageQueueByConfig();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
strategy = new AllocateMessageQueueByMachineRoom();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
strategy = new AllocateMessageQueueConsistentHash();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
strategy = new AllocateMessageQueueSticky();
break;
default:
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
response.setRemark("AllocateMessageQueueStrategy[" + strategyName + "] is not supported by broker");
return response;
}
strategyTable.put(consumerGroup, strategy);
}
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
requestHeader.getConsumerGroup(),
requestHeader.getClientID(),
requestBody.getMqAll(),
consumerGroupInfo != null ? consumerGroupInfo.getAllClientId() : null);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
strategy.getName(), e);
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
response.setRemark(e.getMessage());
return response;
}
AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody();
body.setAllocateResult(allocateResult);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerManageProcessorTest {
private ConsumerManageProcessor consumerManageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private Channel channel;
private ClientChannelInfo clientChannelInfo;
private String clientId = UUID.randomUUID().toString();
private String group = "FooBarGroup";
private String topic = "FooBar";
private List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
@Before
public void init() {
consumerManageProcessor = new ConsumerManageProcessor(brokerController);
clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4));
ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
}
@Test
public void testAllocateMessageQueue() throws RemotingCommandException {
String emptyClientId = "";
RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("currentCID is empty");
request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private RemotingCommand buildAllocateMessageQueueRequest(String clientId, String strategy) {
AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
requestHeader.setConsumerGroup(group);
requestHeader.setClientID(clientId);
requestHeader.setStrategyName(strategy);
AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
requestBody.setMqAll(mqAll);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
request.setBody(requestBody.encode());
request.makeCustomHeaderToNet();
return request;
}
}
......@@ -208,7 +208,7 @@ public class PullMessageProcessorTest {
return request;
}
static ConsumerData createConsumerData(String group, String topic) {
public static ConsumerData createConsumerData(String group, String topic) {
ConsumerData consumerData = new ConsumerData();
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.rebalance;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class AllocateMessageQueueStickyTest {
private ConsumerManageProcessor consumerManageProcessor;
@Spy
private final BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
private static final String CID_PREFIX = "CID-";
private final String group = "FooBarGroup";
private final String topic = "FooBar";
private List<MessageQueue> messageQueueList;
private List<String> consumerIdList;
@Before
public void init() {
consumerManageProcessor = new ConsumerManageProcessor(brokerController);
messageQueueList = new ArrayList<MessageQueue>();
consumerIdList = new ArrayList<String>();
}
@Test
public void testCurrentCIDNotExists() throws RemotingCommandException {
String currentCID = String.valueOf(Integer.MAX_VALUE);
createConsumerIdList(4);
createMessageQueueList(15);
RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
Assert.assertEquals(AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class).getAllocateResult().size(), 0);
}
@Test
public void testCurrentCIDIllegalArgument() throws RemotingCommandException {
String currentCID = "";
createConsumerIdList(4);
createMessageQueueList(15);
RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("currentCID is empty");
}
@Test
public void testMessageQueueIllegalArgument() throws RemotingCommandException {
String currentCID = CID_PREFIX + 0;
createConsumerIdList(4);
createMessageQueueList(0);
RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("mqAll is null or mqAll empty");
}
@Test
public void testConsumerIdIllegalArgument() throws RemotingCommandException {
String currentCID = CID_PREFIX + 0;
createConsumerIdList(0);
createMessageQueueList(15);
RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("cidAll is null or cidAll empty");
}
@Test
public void testAllocateMessageQueue1() throws RemotingCommandException {
createConsumerIdList(4);
createMessageQueueList(10);
for (String clientId : consumerIdList) {
RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
System.out.println(AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class).getAllocateResult());
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
}
@Test
public void testAllocateMessageQueue2() throws RemotingCommandException {
createConsumerIdList(10);
createMessageQueueList(4);
for (String clientId : consumerIdList) {
RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
}
@Test
public void testRun10RandomCase() throws RemotingCommandException {
for (int i = 0; i < 10; i++) {
int consumerSize = new Random().nextInt(20) + 2;
int queueSize = new Random().nextInt(20) + 4;
testAllocateMessageQueue(consumerSize, queueSize);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void testAllocateMessageQueue(int consumerSize, int queueSize) throws RemotingCommandException {
createConsumerIdList(consumerSize);
createMessageQueueList(queueSize);
Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<MessageQueue, String>();
List<MessageQueue> allocatedResAll = new ArrayList<MessageQueue>();
// test allocate all
{
List<String> cidBegin = new ArrayList<String>(consumerIdList);
for (String cid : cidBegin) {
RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class).getAllocateResult();
for (MessageQueue mq : rs) {
allocateToAllOrigin.put(mq, cid);
}
allocatedResAll.addAll(rs);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
Assert.assertTrue(verifyAllocateAllConsumer(cidBegin, messageQueueList, allocatedResAll));
}
Map<MessageQueue, String> allocateToAllAfterRemoveOneConsumer = new TreeMap<MessageQueue, String>();
List<String> cidAfterRemoveOneConsumer = new ArrayList<String>(consumerIdList);
// test allocate after removing one cid
{
String removeCID = cidAfterRemoveOneConsumer.remove(0);
unregisterConsumer(removeCID);
List<MessageQueue> mqShouldBeChanged = new ArrayList<MessageQueue>();
for (Map.Entry<MessageQueue, String> entry : allocateToAllOrigin.entrySet()) {
if (entry.getValue().equals(removeCID)) {
mqShouldBeChanged.add(entry.getKey());
}
}
List<MessageQueue> allocatedResAllAfterRemoveOneConsumer = new ArrayList<MessageQueue>();
for (String cid : cidAfterRemoveOneConsumer) {
RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class).getAllocateResult();
allocatedResAllAfterRemoveOneConsumer.addAll(rs);
for (MessageQueue mq : rs) {
allocateToAllAfterRemoveOneConsumer.put(mq, cid);
}
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
Assert.assertTrue(verifyAllocateAllConsumer(cidAfterRemoveOneConsumer, messageQueueList, allocatedResAllAfterRemoveOneConsumer));
verifyAfterRemoveConsumer(allocateToAllOrigin, allocateToAllAfterRemoveOneConsumer, removeCID);
}
Map<MessageQueue, String> allocateToAllAfterAddOneConsumer = new TreeMap<MessageQueue, String>();
List<String> cidAfterAddOneConsumer = new ArrayList<String>(cidAfterRemoveOneConsumer);
// test allocate after adding one more cid
{
String newCid = CID_PREFIX + "NEW";
cidAfterAddOneConsumer.add(newCid);
registerConsumer(newCid);
List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
List<MessageQueue> allocatedResAllAfterAddOneConsumer = new ArrayList<MessageQueue>();
for (String cid : cidAfterAddOneConsumer) {
RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class).getAllocateResult();
allocatedResAllAfterAddOneConsumer.addAll(rs);
for (MessageQueue mq : rs) {
allocateToAllAfterAddOneConsumer.put(mq, cid);
if (cid.equals(newCid)) {
mqShouldOnlyChanged.add(mq);
}
}
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, messageQueueList, allocatedResAllAfterAddOneConsumer));
verifyAfterAddConsumer(allocateToAllAfterRemoveOneConsumer, allocateToAllAfterAddOneConsumer, newCid);
}
Map<MessageQueue, String> allocateToAllAfterRemoveTwoMq = new TreeMap<MessageQueue, String>();
List<MessageQueue> mqAfterRemoveTwoMq = new ArrayList<>(messageQueueList);
// test allocate after removing two message queues
{
for (int i = 0; i < 2; i++) {
mqAfterRemoveTwoMq.remove(i);
}
List<MessageQueue> allocatedResAfterRemoveTwoMq = new ArrayList<MessageQueue>();
for (String cid : cidAfterAddOneConsumer) {
RemotingCommand request = buildAllocateMessageQueueRequest(cid, mqAfterRemoveTwoMq);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class).getAllocateResult();
allocatedResAfterRemoveTwoMq.addAll(rs);
for (MessageQueue mq : rs) {
allocateToAllAfterRemoveTwoMq.put(mq, cid);
}
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, mqAfterRemoveTwoMq, allocatedResAfterRemoveTwoMq));
}
}
private void verifyAfterAddConsumer(Map<MessageQueue, String> allocateBefore,
Map<MessageQueue, String> allocateAfter, String newCID) {
for (MessageQueue mq : allocateAfter.keySet()) {
String allocateToOrigin = allocateBefore.get(mq);
String allocateToAfter = allocateAfter.get(mq);
if (!allocateToAfter.equals(newCID)) { // the rest queues should be the same
Assert.assertEquals(allocateToAfter, allocateToOrigin);
}
}
}
private void verifyAfterRemoveConsumer(Map<MessageQueue, String> allocateToBefore,
Map<MessageQueue, String> allocateAfter, String removeCID) {
for (MessageQueue mq : allocateToBefore.keySet()) {
String allocateToOrigin = allocateToBefore.get(mq);
String allocateToAfter = allocateAfter.get(mq);
if (!allocateToOrigin.equals(removeCID)) { // the rest queues should be the same
Assert.assertEquals(allocateToAfter, allocateToOrigin); // should be the same
}
}
}
private boolean verifyAllocateAllConsumer(List<String> cidAll, List<MessageQueue> mqAll,
List<MessageQueue> allocatedResAll) {
if (cidAll.isEmpty()) {
return allocatedResAll.isEmpty();
}
return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
}
private void registerConsumer(String clientId) {
Channel channel = mock(Channel.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
consumerIdList.add(clientId);
}
private void unregisterConsumer(String clientId) {
Channel channel = brokerController.getConsumerManager().getConsumerGroupInfo(group).findChannel(clientId).getChannel();
brokerController.getConsumerManager().unregisterConsumer(group,
new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100), true);
consumerIdList.remove(clientId);
}
private RemotingCommand buildAllocateMessageQueueRequest(String clientId, List<MessageQueue> messageQueueList) {
AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
requestHeader.setConsumerGroup(group);
requestHeader.setClientID(clientId);
requestHeader.setStrategyName(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY);
AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
requestBody.setMqAll(messageQueueList);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
request.setBody(requestBody.encode());
request.makeCustomHeaderToNet();
return request;
}
private void createConsumerIdList(int size) {
for (int i = 0; i < size; i++) {
String clientId = CID_PREFIX + i;
registerConsumer(clientId);
}
}
private void createMessageQueueList(int size) {
for (int i = 0; i < size; i++) {
MessageQueue mq = new MessageQueue(topic, "brokerName", i);
messageQueueList.add(mq);
}
}
}
......@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
......@@ -66,6 +67,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* The switch for applying the rebalancing calculation task at the broker side
*/
private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
......@@ -409,6 +415,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.messageModel = messageModel;
}
public boolean isRebalanceByBroker() {
return rebalanceByBroker;
}
public void setRebalanceByBroker(boolean rebalanceByBroker) {
this.rebalanceByBroker = rebalanceByBroker;
}
public String getConsumerGroup() {
return consumerGroup;
}
......
......@@ -20,17 +20,18 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -65,6 +66,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* The switch for applying the rebalancing calculation task at the broker side
*/
private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
......@@ -245,6 +250,14 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.messageModel = messageModel;
}
public boolean isRebalanceByBroker() {
return rebalanceByBroker;
}
public void setRebalanceByBroker(boolean rebalanceByBroker) {
this.rebalanceByBroker = rebalanceByBroker;
}
public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
......
......@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -33,6 +32,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -92,6 +93,19 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* The switch for applying the rebalancing calculation task at the broker side.
* </p>
*
* RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
* the same {@link #consumerGroup} would execute rebalancing calculations at the client side in default. The switch
* is responsible for shifting the rebalancing calculation task to the broker side.
* </p>
*
* This field defaults to false.
*/
private boolean rebalanceByBroker = false;
/**
* Consuming point on consumer booting.
* </p>
......@@ -574,6 +588,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.messageModel = messageModel;
}
public boolean isRebalanceByBroker() {
return rebalanceByBroker;
}
public void setRebalanceByBroker(boolean rebalanceByBroker) {
this.rebalanceByBroker = rebalanceByBroker;
}
public int getPullBatchSize() {
return pullBatchSize;
}
......
......@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
......@@ -86,6 +87,8 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
......@@ -901,6 +904,39 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public List<MessageQueue> getAllocateResultByStrategy(final String addr, final String group, final String clientId,
final String strategyName, final List<MessageQueue> mqAll, final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
requestHeader.setConsumerGroup(group);
requestHeader.setClientID(clientId);
requestHeader.setStrategyName(strategyName);
AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
requestBody.setMqAll(mqAll);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
request.setBody(requestBody.encode());
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
AllocateMessageQueueResponseBody body = AllocateMessageQueueRequestBody.decode(response.getBody(),
AllocateMessageQueueResponseBody.class);
return body.getAllocateResult();
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
......
......@@ -873,6 +873,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return this.defaultLitePullConsumer.getMessageModel();
}
@Override
public boolean rebalanceByBroker() {
return this.defaultLitePullConsumer.isRebalanceByBroker();
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
......
......@@ -345,6 +345,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return this.defaultMQPullConsumer.getMessageModel();
}
@Override
public boolean rebalanceByBroker() {
return this.defaultMQPullConsumer.isRebalanceByBroker();
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
......
......@@ -990,6 +990,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return this.defaultMQPushConsumer.getMessageModel();
}
@Override
public boolean rebalanceByBroker() {
return this.defaultMQPushConsumer.isRebalanceByBroker();
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
......
......@@ -32,6 +32,8 @@ public interface MQConsumerInner {
MessageModel messageModel();
boolean rebalanceByBroker();
ConsumeType consumeType();
ConsumeFromWhere consumeFromWhere();
......
......@@ -27,18 +27,19 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.logging.InternalLogger;
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
......@@ -277,16 +278,30 @@ public abstract class RebalanceImpl {
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
if (isRebalanceByBroker()) {
allocateResult = this.mQClientFactory.getAllocateResult(
topic,
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
strategy.getName(),
mqAll);
} else {
if (strategy.getName().equals(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY)) {
log.error("AllocateMessageQueueStrategy is not supported while rebalanceByBroker=false. allocateMessageQueueStrategyName={}",
strategy.getName());
return;
}
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
......@@ -412,6 +427,8 @@ public abstract class RebalanceImpl {
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
public abstract boolean isRebalanceByBroker();
public void removeProcessQueue(final MessageQueue mq) {
ProcessQueue prev = this.processQueueTable.remove(mq);
if (prev != null) {
......
......@@ -18,11 +18,11 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -142,4 +142,9 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
@Override
public boolean isRebalanceByBroker() {
return this.litePullConsumerImpl.rebalanceByBroker();
}
}
......@@ -18,9 +18,9 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
......@@ -76,4 +76,9 @@ public class RebalancePullImpl extends RebalanceImpl {
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
@Override
public boolean isRebalanceByBroker() {
return this.defaultMQPullConsumerImpl.rebalanceByBroker();
}
}
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -218,4 +218,9 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
@Override
public boolean isRebalanceByBroker() {
return this.defaultMQPushConsumerImpl.rebalanceByBroker();
}
}
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.factory;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -442,7 +443,7 @@ public class MQClientInstance {
}
// may need to check one broker every cluster...
// assume that the configs of every broker in cluster are the the same.
String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
String addr = findRandomBrokerAddrByTopic(subscriptionData.getTopic());
if (addr != null) {
try {
......@@ -1069,10 +1070,10 @@ public class MQClientInstance {
}
public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic);
String brokerAddr = this.findRandomBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
brokerAddr = this.findRandomBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
......@@ -1086,7 +1087,7 @@ public class MQClientInstance {
return null;
}
public String findBrokerAddrByTopic(final String topic) {
public String findRandomBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
......@@ -1100,6 +1101,24 @@ public class MQClientInstance {
return null;
}
public String findUniqueBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
if (!brokers.isEmpty()) {
Collections.sort(brokers, new Comparator<BrokerData>() {
@Override public int compare(BrokerData o1, BrokerData o2) {
return o1.getBrokerName().compareTo(o2.getBrokerName());
}
});
BrokerData bd = brokers.get(0);
return bd.selectBrokerAddr();
}
}
return null;
}
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
......@@ -1161,6 +1180,26 @@ public class MQClientInstance {
}
}
public List<MessageQueue> getAllocateResult(final String topic, final String group, final String strategyName,
final List<MessageQueue> mqAll) {
String brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
return this.mQClientAPIImpl.getAllocateResultByStrategy(brokerAddr, group, clientId, strategyName,
mqAll, 3000);
} catch (Exception e) {
log.warn("getAllocateResultByStrategy exception, {} {}", brokerAddr, group, e);
}
}
return null;
}
public TopicRouteData getAnExistTopicRouteData(final String topic) {
return this.topicRouteTable.get(topic);
}
......
......@@ -23,8 +23,10 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rebalance.AllocateMachineRoomNearby;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......
......@@ -22,8 +22,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......
......@@ -20,13 +20,13 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
package org.apache.rocketmq.common;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
......
......@@ -33,6 +33,8 @@ public class RequestCode {
public static final int GET_TOPIC_NAME_LIST = 23;
public static final int ALLOCATE_MESSAGE_QUEUE = 24;
public static final int UPDATE_BROKER_CONFIG = 25;
public static final int GET_BROKER_CONFIG = 26;
......
......@@ -80,4 +80,5 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
public static final int ALLOCATE_MESSAGE_QUEUE_FAILED = 212;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class AllocateMessageQueueRequestBody extends RemotingSerializable {
private List<MessageQueue> mqAll;
public List<MessageQueue> getMqAll() {
return mqAll;
}
public void setMqAll(List<MessageQueue> mqAll) {
this.mqAll = mqAll;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AllocateMessageQueueRequestHeader implements CommandCustomHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
private String clientID;
@CFNotNull
private String strategyName;
@Override public void checkFields() throws RemotingCommandException {
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getClientID() {
return clientID;
}
public void setClientID(String clientID) {
this.clientID = clientID;
}
public String getStrategyName() {
return strategyName;
}
public void setStrategyName(String strategyName) {
this.strategyName = strategyName;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class AllocateMessageQueueResponseBody extends RemotingSerializable {
private List<MessageQueue> allocateResult;
public List<MessageQueue> getAllocateResult() {
return allocateResult;
}
public void setAllocateResult(List<MessageQueue> allocateResult) {
this.allocateResult = allocateResult;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AllocateMessageQueueResponseHeader implements CommandCustomHeader {
@Override public void checkFields() throws RemotingCommandException {
}
}
......@@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
......@@ -35,7 +36,7 @@ import org.apache.rocketmq.logging.InternalLogger;
* no alive consumer to monopolize them.
*/
public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;
......
......@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
......
......@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Cycle average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
......
......@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
package org.apache.rocketmq.common.rebalance;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
......
......@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
/**
......
......@@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Consistent Hashing queue algorithm
*/
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.rebalance;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class AllocateMessageQueueSticky implements AllocateMessageQueueStrategy {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final Map<String, List<MessageQueue>> messageQueueAllocation = new HashMap<String, List<MessageQueue>>();
private final List<MessageQueue> unassignedQueues = new ArrayList<MessageQueue>();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID,
List<MessageQueue> mqAll, final List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// Rebalance (or fresh assignment) needed
if (cidAll.size() != messageQueueAllocation.size() || mqAll.size() != getPrevMqAll().size()) {
// Update messageQueueAllocation
updateMessageQueueAllocation(mqAll, cidAll);
// Sort consumers based on how many message queues are assigned to them
TreeSet<String> sortedSubscriptions = new TreeSet<String>(new ConsumerComparator(messageQueueAllocation));
sortedSubscriptions.addAll(messageQueueAllocation.keySet());
// Assign unassignedQueues to consumers so that queues allocations are as balanced as possible
for (MessageQueue mq : unassignedQueues) {
String consumer = sortedSubscriptions.first();
sortedSubscriptions.remove(consumer);
messageQueueAllocation.get(consumer).add(mq);
sortedSubscriptions.add(consumer);
}
unassignedQueues.clear();
// Reassignment until no message queue can be moved to improve the balance
Map<String, List<MessageQueue>> preBalanceAllocation = new HashMap<String, List<MessageQueue>>(messageQueueAllocation);
while (!isBalanced(sortedSubscriptions)) {
String leastSubscribedConsumer = sortedSubscriptions.first();
String mostSubscribedConsumer = sortedSubscriptions.last();
MessageQueue mqFromMostSubscribedConsumer = messageQueueAllocation.get(mostSubscribedConsumer).get(0);
messageQueueAllocation.get(leastSubscribedConsumer).add(mqFromMostSubscribedConsumer);
messageQueueAllocation.get(mostSubscribedConsumer).remove(mqFromMostSubscribedConsumer);
}
// Make sure it is getting a more balanced allocation than before; otherwise, revert to previous allocation
if (getBalanceScore(messageQueueAllocation) >= getBalanceScore(preBalanceAllocation)) {
deepCopy(preBalanceAllocation, messageQueueAllocation);
}
}
return messageQueueAllocation.get(currentCID);
}
private void updateMessageQueueAllocation(List<MessageQueue> mqAll, List<String> cidAll) {
// The current size of consumers is larger than before
if (cidAll.size() > messageQueueAllocation.size()) {
for (String cid : cidAll) {
if (!messageQueueAllocation.containsKey(cid)) {
messageQueueAllocation.put(cid, new ArrayList<MessageQueue>());
}
}
}
// The current size of consumers is smaller than before
if (cidAll.size() < messageQueueAllocation.size()) {
Iterator<Map.Entry<String, List<MessageQueue>>> it = messageQueueAllocation.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, List<MessageQueue>> entry = it.next();
if (!cidAll.contains(entry.getKey())) {
it.remove();
}
}
}
// The current size of message queues is larger than before
List<MessageQueue> prevMqAll = getPrevMqAll();
if (mqAll.size() > prevMqAll.size()) {
for (MessageQueue mq : mqAll) {
if (!prevMqAll.contains(mq)) {
unassignedQueues.add(mq);
}
}
}
// The current size of message queues is smaller than before
if (mqAll.size() < prevMqAll.size()) {
for (MessageQueue prevMq : prevMqAll) {
if (!isSameQueueIdExists(mqAll, prevMq.getQueueId())) {
for (List<MessageQueue> prevMqs : messageQueueAllocation.values()) {
prevMqs.remove(prevMq);
}
}
}
}
}
private static class ConsumerComparator implements Comparator<String>, Serializable {
private final Map<String, List<MessageQueue>> map;
ConsumerComparator(Map<String, List<MessageQueue>> map) {
this.map = map;
}
@Override
public int compare(String o1, String o2) {
int ret = map.get(o1).size() - map.get(o2).size();
if (ret == 0) {
ret = o1.compareTo(o2);
}
return ret;
}
}
private boolean isSameQueueIdExists(List<MessageQueue> mqAll, int prevMqId) {
for (MessageQueue mq : mqAll) {
if (mq.getQueueId() == prevMqId) {
return true;
}
}
return false;
}
private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions) {
int min = this.messageQueueAllocation.get(sortedCurrentSubscriptions.first()).size();
int max = this.messageQueueAllocation.get(sortedCurrentSubscriptions.last()).size();
// if minimum and maximum numbers of message queues allocated to consumers differ by at most 1
return min >= max - 1;
}
/**
* @return The balance score of the given allocation, which is the sum of assigned queues size difference of all
* consumer. A well balanced allocation with balance score of 0 (all consumers getting the same number of
* allocations). Lower balance score represents a more balanced allocation.
*/
private int getBalanceScore(Map<String, List<MessageQueue>> allocation) {
int score = 0;
Map<String, Integer> consumerAllocationSizes = new HashMap<String, Integer>();
for (Map.Entry<String, List<MessageQueue>> entry : allocation.entrySet()) {
consumerAllocationSizes.put(entry.getKey(), entry.getValue().size());
}
Iterator<Map.Entry<String, Integer>> it = consumerAllocationSizes.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Integer> entry = it.next();
int consumerAllocationSize = entry.getValue();
it.remove();
for (Map.Entry<String, Integer> otherEntry : consumerAllocationSizes.entrySet()) {
score += Math.abs(consumerAllocationSize - otherEntry.getValue());
}
}
return score;
}
private void deepCopy(Map<String, List<MessageQueue>> source, Map<String, List<MessageQueue>> dest) {
dest.clear();
for (Map.Entry<String, List<MessageQueue>> entry : source.entrySet()) {
dest.put(entry.getKey(), new ArrayList<MessageQueue>(entry.getValue()));
}
}
private List<MessageQueue> getPrevMqAll() {
List<MessageQueue> prevMqAll = new ArrayList<MessageQueue>();
for (List<MessageQueue> queues : messageQueueAllocation.values()) {
prevMqAll.addAll(queues);
}
return prevMqAll;
}
@Override
public String getName() {
return "STICKY";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.rebalance;
public class AllocateMessageQueueStrategyConstants {
public static final String ALLOCATE_MACHINE_ROOM_NEARBY = "MACHINE_ROOM_NEARBY";
public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY = "AVG";
public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE = "AVG_BY_CIRCLE";
public static final String ALLOCATE_MESSAGE_QUEUE_BY_CONFIG = "CONFIG";
public static final String ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM = "MACHINE_ROOM";
public static final String ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH = "CONSISTENT_HASH";
public static final String ALLOCATE_MESSAGE_QUEUE_STICKY = "STICKY";
}
......@@ -29,7 +29,6 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
......@@ -37,6 +36,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
......
......@@ -23,10 +23,10 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册