From c60cb0fccbf1a2bc8e4b92d75f4e159691b885f7 Mon Sep 17 00:00:00 2001
From: Jack Tsai
Date: Mon, 14 Sep 2020 12:45:24 +0800
Subject: [PATCH] [ISSUE #2149] Apache RocketMQ rebalancing architecture
optimization (#2169)
---
.../rocketmq/broker/BrokerController.java | 9 +
.../processor/ConsumerManageProcessor.java | 87 ++++-
.../ConsumerManageProcessorTest.java | 115 ++++++
.../processor/PullMessageProcessorTest.java | 2 +-
.../AllocateMessageQueueStickyTest.java | 355 ++++++++++++++++++
.../consumer/DefaultLitePullConsumer.java | 16 +-
.../consumer/DefaultMQPullConsumer.java | 15 +-
.../consumer/DefaultMQPushConsumer.java | 24 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 36 ++
.../consumer/DefaultLitePullConsumerImpl.java | 5 +
.../consumer/DefaultMQPullConsumerImpl.java | 5 +
.../consumer/DefaultMQPushConsumerImpl.java | 5 +
.../client/impl/consumer/MQConsumerInner.java | 2 +
.../client/impl/consumer/RebalanceImpl.java | 39 +-
.../impl/consumer/RebalanceLitePullImpl.java | 7 +-
.../impl/consumer/RebalancePullImpl.java | 7 +-
.../impl/consumer/RebalancePushImpl.java | 7 +-
.../client/impl/factory/MQClientInstance.java | 47 ++-
.../AllocateMachineRoomNearByTest.java | 4 +-
...AllocateMessageQueueConsitentHashTest.java | 3 +-
.../impl/consumer/RebalancePushImplTest.java | 2 +-
.../common}/AllocateMessageQueueStrategy.java | 2 +-
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../common/protocol/ResponseCode.java | 1 +
.../body/AllocateMessageQueueRequestBody.java | 34 ++
.../AllocateMessageQueueRequestHeader.java | 58 +++
.../AllocateMessageQueueResponseBody.java | 34 ++
.../AllocateMessageQueueResponseHeader.java | 27 ++
.../rebalance/AllocateMachineRoomNearby.java | 9 +-
.../AllocateMessageQueueAveragely.java | 11 +-
...AllocateMessageQueueAveragelyByCircle.java | 11 +-
.../AllocateMessageQueueByConfig.java | 4 +-
.../AllocateMessageQueueByMachineRoom.java | 4 +-
.../AllocateMessageQueueConsistentHash.java | 11 +-
.../rebalance/AllocateMessageQueueSticky.java | 220 +++++++++++
...AllocateMessageQueueStrategyConstants.java | 34 ++
.../rocketmq/example/simple/AclClient.java | 2 +-
.../command/topic/AllocateMQSubCommand.java | 2 +-
38 files changed, 1206 insertions(+), 52 deletions(-)
create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/AllocateMessageQueueStrategy.java (97%)
create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/rebalance/AllocateMachineRoomNearby.java (94%)
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/rebalance/AllocateMessageQueueAveragely.java (88%)
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/rebalance/AllocateMessageQueueAveragelyByCircle.java (87%)
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/rebalance/AllocateMessageQueueByConfig.java (92%)
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/rebalance/AllocateMessageQueueByMachineRoom.java (95%)
rename {client/src/main/java/org/apache/rocketmq/client/consumer => common/src/main/java/org/apache/rocketmq/common}/rebalance/AllocateMessageQueueConsistentHash.java (92%)
create mode 100644 common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
create mode 100644 common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 194f2850..a39457a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -73,6 +73,7 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
@@ -143,6 +144,8 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List sendMessageHookList = new ArrayList();
private final List consumeMessageHookList = new ArrayList();
+ private final ConcurrentMap allocateMessageQueueStrategyTable
+ = new ConcurrentHashMap();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
@@ -605,10 +608,12 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
@@ -741,6 +746,10 @@ public class BrokerController {
return subscriptionGroupManager;
}
+ public ConcurrentMap getAllocateMessageQueueStrategyTable() {
+ return allocateMessageQueueStrategyTable;
+ }
+
public void shutdown() {
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 77317a6f..bc12a38d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -18,12 +18,18 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
+import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
@@ -31,6 +37,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -57,6 +71,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
+ case RequestCode.ALLOCATE_MESSAGE_QUEUE:
+ return this.allocateMessageQueue(ctx, request);
default:
break;
}
@@ -152,4 +168,73 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return response;
}
+
+ private synchronized RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
+ final AllocateMessageQueueRequestHeader requestHeader =
+ (AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
+ final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(),
+ AllocateMessageQueueRequestBody.class);
+
+ AllocateMessageQueueStrategy strategy = null;
+ String consumerGroup = requestHeader.getConsumerGroup();
+ String strategyName = requestHeader.getStrategyName();
+ Map strategyTable = this.brokerController.getAllocateMessageQueueStrategyTable();
+
+ if (strategyTable.containsKey(consumerGroup) && strategyName.equals(strategyTable.get(consumerGroup).getName())) {
+ strategy = strategyTable.get(consumerGroup);
+ } else {
+ switch (strategyName) {
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
+ strategy = new AllocateMessageQueueAveragely();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
+ strategy = new AllocateMessageQueueAveragelyByCircle();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
+ strategy = new AllocateMessageQueueByConfig();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
+ strategy = new AllocateMessageQueueByMachineRoom();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
+ strategy = new AllocateMessageQueueConsistentHash();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
+ strategy = new AllocateMessageQueueSticky();
+ break;
+ default:
+ response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ response.setRemark("AllocateMessageQueueStrategy[" + strategyName + "] is not supported by broker");
+ return response;
+ }
+ strategyTable.put(consumerGroup, strategy);
+ }
+
+ ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
+ List allocateResult = null;
+ try {
+ allocateResult = strategy.allocate(
+ requestHeader.getConsumerGroup(),
+ requestHeader.getClientID(),
+ requestBody.getMqAll(),
+ consumerGroupInfo != null ? consumerGroupInfo.getAllClientId() : null);
+ } catch (Throwable e) {
+ log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
+ strategy.getName(), e);
+ response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+
+ AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody();
+ body.setAllocateResult(allocateResult);
+ response.setBody(body.encode());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ return response;
+ }
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
new file mode 100644
index 00000000..1cc51d01
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerManageProcessorTest {
+ private ConsumerManageProcessor consumerManageProcessor;
+ @Spy
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private ChannelHandlerContext handlerContext;
+ @Mock
+ private Channel channel;
+
+ private ClientChannelInfo clientChannelInfo;
+ private String clientId = UUID.randomUUID().toString();
+ private String group = "FooBarGroup";
+ private String topic = "FooBar";
+ private List mqAll = new ArrayList();
+
+ @Before
+ public void init() {
+ consumerManageProcessor = new ConsumerManageProcessor(brokerController);
+ clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
+
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0));
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1));
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3));
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4));
+
+ ConsumerData consumerData = createConsumerData(group, topic);
+ brokerController.getConsumerManager().registerConsumer(
+ consumerData.getGroupName(),
+ clientChannelInfo,
+ consumerData.getConsumeType(),
+ consumerData.getMessageModel(),
+ consumerData.getConsumeFromWhere(),
+ consumerData.getSubscriptionDataSet(),
+ false);
+ }
+
+ @Test
+ public void testAllocateMessageQueue() throws RemotingCommandException {
+ String emptyClientId = "";
+ RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("currentCID is empty");
+
+ request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
+ response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ private RemotingCommand buildAllocateMessageQueueRequest(String clientId, String strategy) {
+ AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setClientID(clientId);
+ requestHeader.setStrategyName(strategy);
+
+ AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+ requestBody.setMqAll(mqAll);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+ request.setBody(requestBody.encode());
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index c96f708e..245af534 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -208,7 +208,7 @@ public class PullMessageProcessorTest {
return request;
}
- static ConsumerData createConsumerData(String group, String topic) {
+ public static ConsumerData createConsumerData(String group, String topic) {
ConsumerData consumerData = new ConsumerData();
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
new file mode 100644
index 00000000..e9082274
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.rebalance;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AllocateMessageQueueStickyTest {
+ private ConsumerManageProcessor consumerManageProcessor;
+ @Spy
+ private final BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private ChannelHandlerContext handlerContext;
+
+ private static final String CID_PREFIX = "CID-";
+
+ private final String group = "FooBarGroup";
+ private final String topic = "FooBar";
+ private List messageQueueList;
+ private List consumerIdList;
+
+ @Before
+ public void init() {
+ consumerManageProcessor = new ConsumerManageProcessor(brokerController);
+ messageQueueList = new ArrayList();
+ consumerIdList = new ArrayList();
+ }
+
+ @Test
+ public void testCurrentCIDNotExists() throws RemotingCommandException {
+ String currentCID = String.valueOf(Integer.MAX_VALUE);
+ createConsumerIdList(4);
+ createMessageQueueList(15);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ Assert.assertEquals(AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult().size(), 0);
+ }
+
+ @Test
+ public void testCurrentCIDIllegalArgument() throws RemotingCommandException {
+ String currentCID = "";
+ createConsumerIdList(4);
+ createMessageQueueList(15);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("currentCID is empty");
+ }
+
+ @Test
+ public void testMessageQueueIllegalArgument() throws RemotingCommandException {
+ String currentCID = CID_PREFIX + 0;
+ createConsumerIdList(4);
+ createMessageQueueList(0);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("mqAll is null or mqAll empty");
+ }
+
+ @Test
+ public void testConsumerIdIllegalArgument() throws RemotingCommandException {
+ String currentCID = CID_PREFIX + 0;
+ createConsumerIdList(0);
+ createMessageQueueList(15);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("cidAll is null or cidAll empty");
+ }
+
+ @Test
+ public void testAllocateMessageQueue1() throws RemotingCommandException {
+ createConsumerIdList(4);
+ createMessageQueueList(10);
+ for (String clientId : consumerIdList) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ System.out.println(AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult());
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ }
+
+ @Test
+ public void testAllocateMessageQueue2() throws RemotingCommandException {
+ createConsumerIdList(10);
+ createMessageQueueList(4);
+ for (String clientId : consumerIdList) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ }
+
+ @Test
+ public void testRun10RandomCase() throws RemotingCommandException {
+ for (int i = 0; i < 10; i++) {
+ int consumerSize = new Random().nextInt(20) + 2;
+ int queueSize = new Random().nextInt(20) + 4;
+ testAllocateMessageQueue(consumerSize, queueSize);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void testAllocateMessageQueue(int consumerSize, int queueSize) throws RemotingCommandException {
+ createConsumerIdList(consumerSize);
+ createMessageQueueList(queueSize);
+
+ Map allocateToAllOrigin = new TreeMap();
+ List allocatedResAll = new ArrayList();
+ // test allocate all
+ {
+ List cidBegin = new ArrayList(consumerIdList);
+ for (String cid : cidBegin) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ for (MessageQueue mq : rs) {
+ allocateToAllOrigin.put(mq, cid);
+ }
+ allocatedResAll.addAll(rs);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidBegin, messageQueueList, allocatedResAll));
+ }
+
+ Map allocateToAllAfterRemoveOneConsumer = new TreeMap();
+ List cidAfterRemoveOneConsumer = new ArrayList(consumerIdList);
+ // test allocate after removing one cid
+ {
+ String removeCID = cidAfterRemoveOneConsumer.remove(0);
+ unregisterConsumer(removeCID);
+ List mqShouldBeChanged = new ArrayList();
+ for (Map.Entry entry : allocateToAllOrigin.entrySet()) {
+ if (entry.getValue().equals(removeCID)) {
+ mqShouldBeChanged.add(entry.getKey());
+ }
+ }
+
+ List allocatedResAllAfterRemoveOneConsumer = new ArrayList();
+ for (String cid : cidAfterRemoveOneConsumer) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ allocatedResAllAfterRemoveOneConsumer.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterRemoveOneConsumer.put(mq, cid);
+ }
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidAfterRemoveOneConsumer, messageQueueList, allocatedResAllAfterRemoveOneConsumer));
+ verifyAfterRemoveConsumer(allocateToAllOrigin, allocateToAllAfterRemoveOneConsumer, removeCID);
+ }
+
+ Map allocateToAllAfterAddOneConsumer = new TreeMap();
+ List cidAfterAddOneConsumer = new ArrayList(cidAfterRemoveOneConsumer);
+ // test allocate after adding one more cid
+ {
+ String newCid = CID_PREFIX + "NEW";
+ cidAfterAddOneConsumer.add(newCid);
+ registerConsumer(newCid);
+
+ List mqShouldOnlyChanged = new ArrayList();
+ List allocatedResAllAfterAddOneConsumer = new ArrayList();
+ for (String cid : cidAfterAddOneConsumer) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ allocatedResAllAfterAddOneConsumer.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterAddOneConsumer.put(mq, cid);
+ if (cid.equals(newCid)) {
+ mqShouldOnlyChanged.add(mq);
+ }
+ }
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, messageQueueList, allocatedResAllAfterAddOneConsumer));
+ verifyAfterAddConsumer(allocateToAllAfterRemoveOneConsumer, allocateToAllAfterAddOneConsumer, newCid);
+ }
+
+ Map allocateToAllAfterRemoveTwoMq = new TreeMap();
+ List mqAfterRemoveTwoMq = new ArrayList<>(messageQueueList);
+ // test allocate after removing two message queues
+ {
+ for (int i = 0; i < 2; i++) {
+ mqAfterRemoveTwoMq.remove(i);
+ }
+
+ List allocatedResAfterRemoveTwoMq = new ArrayList();
+ for (String cid : cidAfterAddOneConsumer) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, mqAfterRemoveTwoMq);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ allocatedResAfterRemoveTwoMq.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterRemoveTwoMq.put(mq, cid);
+ }
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, mqAfterRemoveTwoMq, allocatedResAfterRemoveTwoMq));
+ }
+ }
+
+ private void verifyAfterAddConsumer(Map allocateBefore,
+ Map allocateAfter, String newCID) {
+ for (MessageQueue mq : allocateAfter.keySet()) {
+ String allocateToOrigin = allocateBefore.get(mq);
+ String allocateToAfter = allocateAfter.get(mq);
+ if (!allocateToAfter.equals(newCID)) { // the rest queues should be the same
+ Assert.assertEquals(allocateToAfter, allocateToOrigin);
+ }
+ }
+ }
+
+ private void verifyAfterRemoveConsumer(Map allocateToBefore,
+ Map allocateAfter, String removeCID) {
+ for (MessageQueue mq : allocateToBefore.keySet()) {
+ String allocateToOrigin = allocateToBefore.get(mq);
+ String allocateToAfter = allocateAfter.get(mq);
+ if (!allocateToOrigin.equals(removeCID)) { // the rest queues should be the same
+ Assert.assertEquals(allocateToAfter, allocateToOrigin); // should be the same
+ }
+ }
+ }
+
+ private boolean verifyAllocateAllConsumer(List cidAll, List mqAll,
+ List allocatedResAll) {
+ if (cidAll.isEmpty()) {
+ return allocatedResAll.isEmpty();
+ }
+ return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+ }
+
+ private void registerConsumer(String clientId) {
+ Channel channel = mock(Channel.class);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
+ ConsumerData consumerData = createConsumerData(group, topic);
+ brokerController.getConsumerManager().registerConsumer(
+ consumerData.getGroupName(),
+ clientChannelInfo,
+ consumerData.getConsumeType(),
+ consumerData.getMessageModel(),
+ consumerData.getConsumeFromWhere(),
+ consumerData.getSubscriptionDataSet(),
+ false);
+ consumerIdList.add(clientId);
+ }
+
+ private void unregisterConsumer(String clientId) {
+ Channel channel = brokerController.getConsumerManager().getConsumerGroupInfo(group).findChannel(clientId).getChannel();
+ brokerController.getConsumerManager().unregisterConsumer(group,
+ new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100), true);
+ consumerIdList.remove(clientId);
+ }
+
+ private RemotingCommand buildAllocateMessageQueueRequest(String clientId, List messageQueueList) {
+ AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setClientID(clientId);
+ requestHeader.setStrategyName(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY);
+
+ AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+ requestBody.setMqAll(messageQueueList);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+ request.setBody(requestBody.encode());
+ request.makeCustomHeaderToNet();
+
+ return request;
+ }
+
+ private void createConsumerIdList(int size) {
+ for (int i = 0; i < size; i++) {
+ String clientId = CID_PREFIX + i;
+ registerConsumer(clientId);
+ }
+ }
+
+ private void createMessageQueueList(int size) {
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6718eb55..3ec19f4f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
@@ -66,6 +67,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
+
+ /**
+ * The switch for applying the rebalancing calculation task at the broker side
+ */
+ private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
@@ -409,6 +415,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.messageModel = messageModel;
}
+ public boolean isRebalanceByBroker() {
+ return rebalanceByBroker;
+ }
+
+ public void setRebalanceByBroker(boolean rebalanceByBroker) {
+ this.rebalanceByBroker = rebalanceByBroker;
+ }
+
public String getConsumerGroup() {
return consumerGroup;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 0876a94e..b9069dca 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -20,17 +20,18 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -65,6 +66,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
+ /**
+ * The switch for applying the rebalancing calculation task at the broker side
+ */
+ private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
@@ -245,6 +250,14 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.messageModel = messageModel;
}
+ public boolean isRebalanceByBroker() {
+ return rebalanceByBroker;
+ }
+
+ public void setRebalanceByBroker(boolean rebalanceByBroker) {
+ this.rebalanceByBroker = rebalanceByBroker;
+ }
+
public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9011117a..743dda76 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -92,6 +93,19 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
+ /**
+ * The switch for applying the rebalancing calculation task at the broker side.
+ *
+ *
+ * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+ * the same {@link #consumerGroup} would execute rebalancing calculations at the client side in default. The switch
+ * is responsible for shifting the rebalancing calculation task to the broker side.
+ *
+ *
+ * This field defaults to false.
+ */
+ private boolean rebalanceByBroker = false;
+
/**
* Consuming point on consumer booting.
*
@@ -574,6 +588,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.messageModel = messageModel;
}
+ public boolean isRebalanceByBroker() {
+ return rebalanceByBroker;
+ }
+
+ public void setRebalanceByBroker(boolean rebalanceByBroker) {
+ this.rebalanceByBroker = rebalanceByBroker;
+ }
+
public int getPullBatchSize() {
return pullBatchSize;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c64d7c56..6ebb8ad1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
@@ -86,6 +87,8 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -901,6 +904,39 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+ public List getAllocateResultByStrategy(final String addr, final String group, final String clientId,
+ final String strategyName, final List mqAll, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException {
+ AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setClientID(clientId);
+ requestHeader.setStrategyName(strategyName);
+
+ AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+ requestBody.setMqAll(mqAll);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ AllocateMessageQueueResponseBody body = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class);
+ return body.getAllocateResult();
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e3d60ffa..5d06df4d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -873,6 +873,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return this.defaultLitePullConsumer.getMessageModel();
}
+ @Override
+ public boolean rebalanceByBroker() {
+ return this.defaultLitePullConsumer.isRebalanceByBroker();
+ }
+
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index afd72a08..8bd1c311 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -345,6 +345,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return this.defaultMQPullConsumer.getMessageModel();
}
+ @Override
+ public boolean rebalanceByBroker() {
+ return this.defaultMQPullConsumer.isRebalanceByBroker();
+ }
+
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index ab585ea4..afefd971 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -990,6 +990,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return this.defaultMQPushConsumer.getMessageModel();
}
+ @Override
+ public boolean rebalanceByBroker() {
+ return this.defaultMQPushConsumer.isRebalanceByBroker();
+ }
+
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index c2e8a1df..242820f0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -32,6 +32,8 @@ public interface MQConsumerInner {
MessageModel messageModel();
+ boolean rebalanceByBroker();
+
ConsumeType consumeType();
ConsumeFromWhere consumeFromWhere();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index b8972a92..a5a22fe5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -27,18 +27,19 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.logging.InternalLogger;
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
@@ -277,16 +278,30 @@ public abstract class RebalanceImpl {
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List allocateResult = null;
- try {
- allocateResult = strategy.allocate(
+
+ if (isRebalanceByBroker()) {
+ allocateResult = this.mQClientFactory.getAllocateResult(
+ topic,
this.consumerGroup,
- this.mQClientFactory.getClientId(),
- mqAll,
- cidAll);
- } catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
+ strategy.getName(),
+ mqAll);
+ } else {
+ if (strategy.getName().equals(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY)) {
+ log.error("AllocateMessageQueueStrategy is not supported while rebalanceByBroker=false. allocateMessageQueueStrategyName={}",
+ strategy.getName());
+ return;
+ }
+ try {
+ allocateResult = strategy.allocate(
+ this.consumerGroup,
+ this.mQClientFactory.getClientId(),
+ mqAll,
+ cidAll);
+ } catch (Throwable e) {
+ log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
+ e);
+ return;
+ }
}
Set allocateResultSet = new HashSet();
@@ -412,6 +427,8 @@ public abstract class RebalanceImpl {
public abstract void dispatchPullRequest(final List pullRequestList);
+ public abstract boolean isRebalanceByBroker();
+
public void removeProcessQueue(final MessageQueue mq) {
ProcessQueue prev = this.processQueueTable.remove(mq);
if (prev != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 9d1ea749..31c3fdaf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -18,11 +18,11 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -142,4 +142,9 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
public void dispatchPullRequest(List pullRequestList) {
}
+ @Override
+ public boolean isRebalanceByBroker() {
+ return this.litePullConsumerImpl.rebalanceByBroker();
+ }
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c1..e9373f6f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -18,9 +18,9 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -76,4 +76,9 @@ public class RebalancePullImpl extends RebalanceImpl {
@Override
public void dispatchPullRequest(List pullRequestList) {
}
+
+ @Override
+ public boolean isRebalanceByBroker() {
+ return this.defaultMQPullConsumerImpl.rebalanceByBroker();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f35..8341480d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -19,11 +19,11 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -218,4 +218,9 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
+
+ @Override
+ public boolean isRebalanceByBroker() {
+ return this.defaultMQPushConsumerImpl.rebalanceByBroker();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index e937ce39..aad51342 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.factory;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -442,7 +443,7 @@ public class MQClientInstance {
}
// may need to check one broker every cluster...
// assume that the configs of every broker in cluster are the the same.
- String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
+ String addr = findRandomBrokerAddrByTopic(subscriptionData.getTopic());
if (addr != null) {
try {
@@ -1069,10 +1070,10 @@ public class MQClientInstance {
}
public List findConsumerIdList(final String topic, final String group) {
- String brokerAddr = this.findBrokerAddrByTopic(topic);
+ String brokerAddr = this.findRandomBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
- brokerAddr = this.findBrokerAddrByTopic(topic);
+ brokerAddr = this.findRandomBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
@@ -1086,7 +1087,7 @@ public class MQClientInstance {
return null;
}
- public String findBrokerAddrByTopic(final String topic) {
+ public String findRandomBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List brokers = topicRouteData.getBrokerDatas();
@@ -1100,6 +1101,24 @@ public class MQClientInstance {
return null;
}
+ public String findUniqueBrokerAddrByTopic(final String topic) {
+ TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
+ if (topicRouteData != null) {
+ List brokers = topicRouteData.getBrokerDatas();
+ if (!brokers.isEmpty()) {
+ Collections.sort(brokers, new Comparator() {
+ @Override public int compare(BrokerData o1, BrokerData o2) {
+ return o1.getBrokerName().compareTo(o2.getBrokerName());
+ }
+ });
+ BrokerData bd = brokers.get(0);
+ return bd.selectBrokerAddr();
+ }
+ }
+
+ return null;
+ }
+
public void resetOffset(String topic, String group, Map offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
@@ -1161,6 +1180,26 @@ public class MQClientInstance {
}
}
+ public List getAllocateResult(final String topic, final String group, final String strategyName,
+ final List mqAll) {
+ String brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
+ if (null == brokerAddr) {
+ this.updateTopicRouteInfoFromNameServer(topic);
+ brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
+ }
+
+ if (null != brokerAddr) {
+ try {
+ return this.mQClientAPIImpl.getAllocateResultByStrategy(brokerAddr, group, clientId, strategyName,
+ mqAll, 3000);
+ } catch (Exception e) {
+ log.warn("getAllocateResultByStrategy exception, {} {}", brokerAddr, group, e);
+ }
+ }
+
+ return null;
+ }
+
public TopicRouteData getAnExistTopicRouteData(final String topic) {
return this.topicRouteTable.get(topic);
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
index 0d394c38..755ad905 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
@@ -23,8 +23,10 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMachineRoomNearby;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
index 98ce7b6e..aa9cfba3 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -22,8 +22,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 796a3943..68260c12 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -20,13 +20,13 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
similarity index 97%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
rename to common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
index c1f06040..e4676097 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer;
+package org.apache.rocketmq.common;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 75ceff38..90642c2e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -33,6 +33,8 @@ public class RequestCode {
public static final int GET_TOPIC_NAME_LIST = 23;
+ public static final int ALLOCATE_MESSAGE_QUEUE = 24;
+
public static final int UPDATE_BROKER_CONFIG = 25;
public static final int GET_BROKER_CONFIG = 26;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc744448..0cef0af9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,5 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
+ public static final int ALLOCATE_MESSAGE_QUEUE_FAILED = 212;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
new file mode 100644
index 00000000..3e051585
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class AllocateMessageQueueRequestBody extends RemotingSerializable {
+ private List mqAll;
+
+ public List getMqAll() {
+ return mqAll;
+ }
+
+ public void setMqAll(List mqAll) {
+ this.mqAll = mqAll;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
new file mode 100644
index 00000000..4c0c11ef
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AllocateMessageQueueRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String consumerGroup;
+ @CFNotNull
+ private String clientID;
+ @CFNotNull
+ private String strategyName;
+
+ @Override public void checkFields() throws RemotingCommandException {
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getClientID() {
+ return clientID;
+ }
+
+ public void setClientID(String clientID) {
+ this.clientID = clientID;
+ }
+
+ public String getStrategyName() {
+ return strategyName;
+ }
+
+ public void setStrategyName(String strategyName) {
+ this.strategyName = strategyName;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
new file mode 100644
index 00000000..a1d4f478
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class AllocateMessageQueueResponseBody extends RemotingSerializable {
+ private List allocateResult;
+
+ public List getAllocateResult() {
+ return allocateResult;
+ }
+
+ public void setAllocateResult(List allocateResult) {
+ this.allocateResult = allocateResult;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
new file mode 100644
index 00000000..7fc460b9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AllocateMessageQueueResponseHeader implements CommandCustomHeader {
+
+ @Override public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
similarity index 94%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
index ec0f7f64..4750ba21 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
@@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
@@ -35,7 +36,7 @@ import org.apache.rocketmq.logging.InternalLogger;
* no alive consumer to monopolize them.
*/
public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
similarity index 88%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
index 155e692a..525bc821 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@Override
public List allocate(String consumerGroup, String currentCID, List mqAll,
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
similarity index 87%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
index fe78f0a6..4bdfa45c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Cycle average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@Override
public List allocate(String consumerGroup, String currentCID, List mqAll,
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
similarity index 92%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
index e548803d..a5aaf663 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
similarity index 95%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
index 37568317..bac3dac4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
/**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
similarity index 92%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
index 65dcf799..8baf7c7c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
@@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Consistent Hashing queue algorithm
*/
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
new file mode 100644
index 00000000..06a9abae
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rebalance;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class AllocateMessageQueueSticky implements AllocateMessageQueueStrategy {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+ private final Map> messageQueueAllocation = new HashMap>();
+
+ private final List unassignedQueues = new ArrayList();
+
+ @Override
+ public List allocate(String consumerGroup, String currentCID,
+ List mqAll, final List cidAll) {
+ if (currentCID == null || currentCID.length() < 1) {
+ throw new IllegalArgumentException("currentCID is empty");
+ }
+ if (mqAll == null || mqAll.isEmpty()) {
+ throw new IllegalArgumentException("mqAll is null or mqAll empty");
+ }
+ if (cidAll == null || cidAll.isEmpty()) {
+ throw new IllegalArgumentException("cidAll is null or cidAll empty");
+ }
+
+ List result = new ArrayList();
+ if (!cidAll.contains(currentCID)) {
+ log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+ consumerGroup,
+ currentCID,
+ cidAll);
+ return result;
+ }
+
+ // Rebalance (or fresh assignment) needed
+ if (cidAll.size() != messageQueueAllocation.size() || mqAll.size() != getPrevMqAll().size()) {
+ // Update messageQueueAllocation
+ updateMessageQueueAllocation(mqAll, cidAll);
+
+ // Sort consumers based on how many message queues are assigned to them
+ TreeSet sortedSubscriptions = new TreeSet(new ConsumerComparator(messageQueueAllocation));
+ sortedSubscriptions.addAll(messageQueueAllocation.keySet());
+
+ // Assign unassignedQueues to consumers so that queues allocations are as balanced as possible
+ for (MessageQueue mq : unassignedQueues) {
+ String consumer = sortedSubscriptions.first();
+ sortedSubscriptions.remove(consumer);
+ messageQueueAllocation.get(consumer).add(mq);
+ sortedSubscriptions.add(consumer);
+ }
+ unassignedQueues.clear();
+
+ // Reassignment until no message queue can be moved to improve the balance
+ Map> preBalanceAllocation = new HashMap>(messageQueueAllocation);
+ while (!isBalanced(sortedSubscriptions)) {
+ String leastSubscribedConsumer = sortedSubscriptions.first();
+ String mostSubscribedConsumer = sortedSubscriptions.last();
+ MessageQueue mqFromMostSubscribedConsumer = messageQueueAllocation.get(mostSubscribedConsumer).get(0);
+ messageQueueAllocation.get(leastSubscribedConsumer).add(mqFromMostSubscribedConsumer);
+ messageQueueAllocation.get(mostSubscribedConsumer).remove(mqFromMostSubscribedConsumer);
+ }
+
+ // Make sure it is getting a more balanced allocation than before; otherwise, revert to previous allocation
+ if (getBalanceScore(messageQueueAllocation) >= getBalanceScore(preBalanceAllocation)) {
+ deepCopy(preBalanceAllocation, messageQueueAllocation);
+ }
+ }
+
+ return messageQueueAllocation.get(currentCID);
+ }
+
+ private void updateMessageQueueAllocation(List mqAll, List cidAll) {
+ // The current size of consumers is larger than before
+ if (cidAll.size() > messageQueueAllocation.size()) {
+ for (String cid : cidAll) {
+ if (!messageQueueAllocation.containsKey(cid)) {
+ messageQueueAllocation.put(cid, new ArrayList());
+ }
+ }
+ }
+
+ // The current size of consumers is smaller than before
+ if (cidAll.size() < messageQueueAllocation.size()) {
+ Iterator>> it = messageQueueAllocation.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry> entry = it.next();
+ if (!cidAll.contains(entry.getKey())) {
+ it.remove();
+ }
+ }
+ }
+
+ // The current size of message queues is larger than before
+ List prevMqAll = getPrevMqAll();
+ if (mqAll.size() > prevMqAll.size()) {
+ for (MessageQueue mq : mqAll) {
+ if (!prevMqAll.contains(mq)) {
+ unassignedQueues.add(mq);
+ }
+ }
+ }
+
+ // The current size of message queues is smaller than before
+ if (mqAll.size() < prevMqAll.size()) {
+ for (MessageQueue prevMq : prevMqAll) {
+ if (!isSameQueueIdExists(mqAll, prevMq.getQueueId())) {
+ for (List prevMqs : messageQueueAllocation.values()) {
+ prevMqs.remove(prevMq);
+ }
+ }
+ }
+ }
+ }
+
+ private static class ConsumerComparator implements Comparator, Serializable {
+ private final Map> map;
+
+ ConsumerComparator(Map> map) {
+ this.map = map;
+ }
+
+ @Override
+ public int compare(String o1, String o2) {
+ int ret = map.get(o1).size() - map.get(o2).size();
+ if (ret == 0) {
+ ret = o1.compareTo(o2);
+ }
+ return ret;
+ }
+ }
+
+ private boolean isSameQueueIdExists(List mqAll, int prevMqId) {
+ for (MessageQueue mq : mqAll) {
+ if (mq.getQueueId() == prevMqId) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isBalanced(TreeSet sortedCurrentSubscriptions) {
+ int min = this.messageQueueAllocation.get(sortedCurrentSubscriptions.first()).size();
+ int max = this.messageQueueAllocation.get(sortedCurrentSubscriptions.last()).size();
+ // if minimum and maximum numbers of message queues allocated to consumers differ by at most 1
+ return min >= max - 1;
+ }
+
+ /**
+ * @return The balance score of the given allocation, which is the sum of assigned queues size difference of all
+ * consumer. A well balanced allocation with balance score of 0 (all consumers getting the same number of
+ * allocations). Lower balance score represents a more balanced allocation.
+ */
+ private int getBalanceScore(Map> allocation) {
+ int score = 0;
+
+ Map consumerAllocationSizes = new HashMap();
+ for (Map.Entry> entry : allocation.entrySet()) {
+ consumerAllocationSizes.put(entry.getKey(), entry.getValue().size());
+ }
+
+ Iterator> it = consumerAllocationSizes.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry entry = it.next();
+ int consumerAllocationSize = entry.getValue();
+ it.remove();
+ for (Map.Entry otherEntry : consumerAllocationSizes.entrySet()) {
+ score += Math.abs(consumerAllocationSize - otherEntry.getValue());
+ }
+ }
+
+ return score;
+ }
+
+ private void deepCopy(Map> source, Map> dest) {
+ dest.clear();
+ for (Map.Entry> entry : source.entrySet()) {
+ dest.put(entry.getKey(), new ArrayList(entry.getValue()));
+ }
+ }
+
+ private List getPrevMqAll() {
+ List prevMqAll = new ArrayList();
+ for (List queues : messageQueueAllocation.values()) {
+ prevMqAll.addAll(queues);
+ }
+ return prevMqAll;
+ }
+
+ @Override
+ public String getName() {
+ return "STICKY";
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
new file mode 100644
index 00000000..bbfefb32
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rebalance;
+
+public class AllocateMessageQueueStrategyConstants {
+
+ public static final String ALLOCATE_MACHINE_ROOM_NEARBY = "MACHINE_ROOM_NEARBY";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY = "AVG";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE = "AVG_BY_CIRCLE";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_BY_CONFIG = "CONFIG";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM = "MACHINE_ROOM";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH = "CONSISTENT_HASH";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_STICKY = "STICKY";
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
index 0c97cd33..0e674efa 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
@@ -37,6 +36,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index a9b9ab03..5e5d4461 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -23,10 +23,10 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
--
GitLab