diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 194f2850fa57274d6a620c72c403ba1c5f1868dd..a39457a8a9b9fa5fff88876ae64729e8104ab3fe 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -73,6 +73,7 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl; import org.apache.rocketmq.broker.util.ServiceProvider; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.Configuration; import org.apache.rocketmq.common.DataVersion; @@ -143,6 +144,8 @@ public class BrokerController { private final BrokerStatsManager brokerStatsManager; private final List sendMessageHookList = new ArrayList(); private final List consumeMessageHookList = new ArrayList(); + private final ConcurrentMap allocateMessageQueueStrategyTable + = new ConcurrentHashMap(); private MessageStore messageStore; private RemotingServer remotingServer; private RemotingServer fastRemotingServer; @@ -605,10 +608,12 @@ public class BrokerController { this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor); /** * EndTransactionProcessor @@ -741,6 +746,10 @@ public class BrokerController { return subscriptionGroupManager; } + public ConcurrentMap getAllocateMessageQueueStrategyTable() { + return allocateMessageQueueStrategyTable; + } + public void shutdown() { if (this.brokerStatsManager != null) { this.brokerStatsManager.shutdown(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 77317a6ffe7cea3d3e9c4e1d27a13f15cbec04ab..bc12a38df0434dfa02bca3968bc4a58d0213cbb1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -18,12 +18,18 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; import java.util.List; +import java.util.Map; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader; @@ -31,6 +37,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -57,6 +71,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen return this.updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this.queryConsumerOffset(ctx, request); + case RequestCode.ALLOCATE_MESSAGE_QUEUE: + return this.allocateMessageQueue(ctx, request); default: break; } @@ -152,4 +168,73 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen return response; } + + private synchronized RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class); + final AllocateMessageQueueRequestHeader requestHeader = + (AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class); + final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(), + AllocateMessageQueueRequestBody.class); + + AllocateMessageQueueStrategy strategy = null; + String consumerGroup = requestHeader.getConsumerGroup(); + String strategyName = requestHeader.getStrategyName(); + Map strategyTable = this.brokerController.getAllocateMessageQueueStrategyTable(); + + if (strategyTable.containsKey(consumerGroup) && strategyName.equals(strategyTable.get(consumerGroup).getName())) { + strategy = strategyTable.get(consumerGroup); + } else { + switch (strategyName) { + case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY: + strategy = new AllocateMessageQueueAveragely(); + break; + case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE: + strategy = new AllocateMessageQueueAveragelyByCircle(); + break; + case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG: + strategy = new AllocateMessageQueueByConfig(); + break; + case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM: + strategy = new AllocateMessageQueueByMachineRoom(); + break; + case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH: + strategy = new AllocateMessageQueueConsistentHash(); + break; + case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY: + strategy = new AllocateMessageQueueSticky(); + break; + default: + response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED); + response.setRemark("AllocateMessageQueueStrategy[" + strategyName + "] is not supported by broker"); + return response; + } + strategyTable.put(consumerGroup, strategy); + } + + ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup); + List allocateResult = null; + try { + allocateResult = strategy.allocate( + requestHeader.getConsumerGroup(), + requestHeader.getClientID(), + requestBody.getMqAll(), + consumerGroupInfo != null ? consumerGroupInfo.getAllClientId() : null); + } catch (Throwable e) { + log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", + strategy.getName(), e); + response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED); + response.setRemark(e.getMessage()); + return response; + } + + AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody(); + body.setAllocateResult(allocateResult); + response.setBody(body.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1cc51d0129a6a3699892b0f3de3556c7ba7f01b2 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerManageProcessorTest { + private ConsumerManageProcessor consumerManageProcessor; + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private ChannelHandlerContext handlerContext; + @Mock + private Channel channel; + + private ClientChannelInfo clientChannelInfo; + private String clientId = UUID.randomUUID().toString(); + private String group = "FooBarGroup"; + private String topic = "FooBar"; + private List mqAll = new ArrayList(); + + @Before + public void init() { + consumerManageProcessor = new ConsumerManageProcessor(brokerController); + clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100); + + mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0)); + mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1)); + mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3)); + mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4)); + + ConsumerData consumerData = createConsumerData(group, topic); + brokerController.getConsumerManager().registerConsumer( + consumerData.getGroupName(), + clientChannelInfo, + consumerData.getConsumeType(), + consumerData.getMessageModel(), + consumerData.getConsumeFromWhere(), + consumerData.getSubscriptionDataSet(), + false); + } + + @Test + public void testAllocateMessageQueue() throws RemotingCommandException { + String emptyClientId = ""; + RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED); + assertThat(response.getRemark()).isEqualTo("currentCID is empty"); + + request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY); + response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + private RemotingCommand buildAllocateMessageQueueRequest(String clientId, String strategy) { + AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader(); + requestHeader.setConsumerGroup(group); + requestHeader.setClientID(clientId); + requestHeader.setStrategyName(strategy); + + AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody(); + requestBody.setMqAll(mqAll); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader); + request.setBody(requestBody.encode()); + request.makeCustomHeaderToNet(); + return request; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java index c96f708e85463937ada305d8930c37dc4d83de9d..245af5347c8225288a62c2e8a9f9c2dd480e3c57 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java @@ -208,7 +208,7 @@ public class PullMessageProcessorTest { return request; } - static ConsumerData createConsumerData(String group, String topic) { + public static ConsumerData createConsumerData(String group, String topic) { ConsumerData consumerData = new ConsumerData(); consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e9082274a89e9f83c1805395be9a6183acd4ffe9 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.rebalance; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.processor.ConsumerManageProcessor; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +@RunWith(MockitoJUnitRunner.class) +public class AllocateMessageQueueStickyTest { + private ConsumerManageProcessor consumerManageProcessor; + @Spy + private final BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private ChannelHandlerContext handlerContext; + + private static final String CID_PREFIX = "CID-"; + + private final String group = "FooBarGroup"; + private final String topic = "FooBar"; + private List messageQueueList; + private List consumerIdList; + + @Before + public void init() { + consumerManageProcessor = new ConsumerManageProcessor(brokerController); + messageQueueList = new ArrayList(); + consumerIdList = new ArrayList(); + } + + @Test + public void testCurrentCIDNotExists() throws RemotingCommandException { + String currentCID = String.valueOf(Integer.MAX_VALUE); + createConsumerIdList(4); + createMessageQueueList(15); + RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class).getAllocateResult().size(), 0); + } + + @Test + public void testCurrentCIDIllegalArgument() throws RemotingCommandException { + String currentCID = ""; + createConsumerIdList(4); + createMessageQueueList(15); + RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED); + assertThat(response.getRemark()).isEqualTo("currentCID is empty"); + } + + @Test + public void testMessageQueueIllegalArgument() throws RemotingCommandException { + String currentCID = CID_PREFIX + 0; + createConsumerIdList(4); + createMessageQueueList(0); + RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED); + assertThat(response.getRemark()).isEqualTo("mqAll is null or mqAll empty"); + } + + @Test + public void testConsumerIdIllegalArgument() throws RemotingCommandException { + String currentCID = CID_PREFIX + 0; + createConsumerIdList(0); + createMessageQueueList(15); + RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED); + assertThat(response.getRemark()).isEqualTo("cidAll is null or cidAll empty"); + } + + @Test + public void testAllocateMessageQueue1() throws RemotingCommandException { + createConsumerIdList(4); + createMessageQueueList(10); + for (String clientId : consumerIdList) { + RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + System.out.println(AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class).getAllocateResult()); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + } + + @Test + public void testAllocateMessageQueue2() throws RemotingCommandException { + createConsumerIdList(10); + createMessageQueueList(4); + for (String clientId : consumerIdList) { + RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + } + + @Test + public void testRun10RandomCase() throws RemotingCommandException { + for (int i = 0; i < 10; i++) { + int consumerSize = new Random().nextInt(20) + 2; + int queueSize = new Random().nextInt(20) + 4; + testAllocateMessageQueue(consumerSize, queueSize); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void testAllocateMessageQueue(int consumerSize, int queueSize) throws RemotingCommandException { + createConsumerIdList(consumerSize); + createMessageQueueList(queueSize); + + Map allocateToAllOrigin = new TreeMap(); + List allocatedResAll = new ArrayList(); + // test allocate all + { + List cidBegin = new ArrayList(consumerIdList); + for (String cid : cidBegin) { + RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + List rs = AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class).getAllocateResult(); + for (MessageQueue mq : rs) { + allocateToAllOrigin.put(mq, cid); + } + allocatedResAll.addAll(rs); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + Assert.assertTrue(verifyAllocateAllConsumer(cidBegin, messageQueueList, allocatedResAll)); + } + + Map allocateToAllAfterRemoveOneConsumer = new TreeMap(); + List cidAfterRemoveOneConsumer = new ArrayList(consumerIdList); + // test allocate after removing one cid + { + String removeCID = cidAfterRemoveOneConsumer.remove(0); + unregisterConsumer(removeCID); + List mqShouldBeChanged = new ArrayList(); + for (Map.Entry entry : allocateToAllOrigin.entrySet()) { + if (entry.getValue().equals(removeCID)) { + mqShouldBeChanged.add(entry.getKey()); + } + } + + List allocatedResAllAfterRemoveOneConsumer = new ArrayList(); + for (String cid : cidAfterRemoveOneConsumer) { + RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + List rs = AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class).getAllocateResult(); + allocatedResAllAfterRemoveOneConsumer.addAll(rs); + for (MessageQueue mq : rs) { + allocateToAllAfterRemoveOneConsumer.put(mq, cid); + } + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + Assert.assertTrue(verifyAllocateAllConsumer(cidAfterRemoveOneConsumer, messageQueueList, allocatedResAllAfterRemoveOneConsumer)); + verifyAfterRemoveConsumer(allocateToAllOrigin, allocateToAllAfterRemoveOneConsumer, removeCID); + } + + Map allocateToAllAfterAddOneConsumer = new TreeMap(); + List cidAfterAddOneConsumer = new ArrayList(cidAfterRemoveOneConsumer); + // test allocate after adding one more cid + { + String newCid = CID_PREFIX + "NEW"; + cidAfterAddOneConsumer.add(newCid); + registerConsumer(newCid); + + List mqShouldOnlyChanged = new ArrayList(); + List allocatedResAllAfterAddOneConsumer = new ArrayList(); + for (String cid : cidAfterAddOneConsumer) { + RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + List rs = AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class).getAllocateResult(); + allocatedResAllAfterAddOneConsumer.addAll(rs); + for (MessageQueue mq : rs) { + allocateToAllAfterAddOneConsumer.put(mq, cid); + if (cid.equals(newCid)) { + mqShouldOnlyChanged.add(mq); + } + } + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, messageQueueList, allocatedResAllAfterAddOneConsumer)); + verifyAfterAddConsumer(allocateToAllAfterRemoveOneConsumer, allocateToAllAfterAddOneConsumer, newCid); + } + + Map allocateToAllAfterRemoveTwoMq = new TreeMap(); + List mqAfterRemoveTwoMq = new ArrayList<>(messageQueueList); + // test allocate after removing two message queues + { + for (int i = 0; i < 2; i++) { + mqAfterRemoveTwoMq.remove(i); + } + + List allocatedResAfterRemoveTwoMq = new ArrayList(); + for (String cid : cidAfterAddOneConsumer) { + RemotingCommand request = buildAllocateMessageQueueRequest(cid, mqAfterRemoveTwoMq); + RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request); + assertThat(response.getBody()).isNotNull(); + List rs = AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class).getAllocateResult(); + allocatedResAfterRemoveTwoMq.addAll(rs); + for (MessageQueue mq : rs) { + allocateToAllAfterRemoveTwoMq.put(mq, cid); + } + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, mqAfterRemoveTwoMq, allocatedResAfterRemoveTwoMq)); + } + } + + private void verifyAfterAddConsumer(Map allocateBefore, + Map allocateAfter, String newCID) { + for (MessageQueue mq : allocateAfter.keySet()) { + String allocateToOrigin = allocateBefore.get(mq); + String allocateToAfter = allocateAfter.get(mq); + if (!allocateToAfter.equals(newCID)) { // the rest queues should be the same + Assert.assertEquals(allocateToAfter, allocateToOrigin); + } + } + } + + private void verifyAfterRemoveConsumer(Map allocateToBefore, + Map allocateAfter, String removeCID) { + for (MessageQueue mq : allocateToBefore.keySet()) { + String allocateToOrigin = allocateToBefore.get(mq); + String allocateToAfter = allocateAfter.get(mq); + if (!allocateToOrigin.equals(removeCID)) { // the rest queues should be the same + Assert.assertEquals(allocateToAfter, allocateToOrigin); // should be the same + } + } + } + + private boolean verifyAllocateAllConsumer(List cidAll, List mqAll, + List allocatedResAll) { + if (cidAll.isEmpty()) { + return allocatedResAll.isEmpty(); + } + return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll); + } + + private void registerConsumer(String clientId) { + Channel channel = mock(Channel.class); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100); + ConsumerData consumerData = createConsumerData(group, topic); + brokerController.getConsumerManager().registerConsumer( + consumerData.getGroupName(), + clientChannelInfo, + consumerData.getConsumeType(), + consumerData.getMessageModel(), + consumerData.getConsumeFromWhere(), + consumerData.getSubscriptionDataSet(), + false); + consumerIdList.add(clientId); + } + + private void unregisterConsumer(String clientId) { + Channel channel = brokerController.getConsumerManager().getConsumerGroupInfo(group).findChannel(clientId).getChannel(); + brokerController.getConsumerManager().unregisterConsumer(group, + new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100), true); + consumerIdList.remove(clientId); + } + + private RemotingCommand buildAllocateMessageQueueRequest(String clientId, List messageQueueList) { + AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader(); + requestHeader.setConsumerGroup(group); + requestHeader.setClientID(clientId); + requestHeader.setStrategyName(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY); + + AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody(); + requestBody.setMqAll(messageQueueList); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader); + request.setBody(requestBody.encode()); + request.makeCustomHeaderToNet(); + + return request; + } + + private void createConsumerIdList(int size) { + for (int i = 0; i < size; i++) { + String clientId = CID_PREFIX + i; + registerConsumer(clientId); + } + } + + private void createMessageQueueList(int size) { + for (int i = 0; i < size; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + messageQueueList.add(mq); + } + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 6718eb55fd8730ac9ff0187f4f0b54fc48b932fa..3ec19f4f793cdde605489bc0c4b4a9720322ccda 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -19,10 +19,10 @@ package org.apache.rocketmq.client.consumer; import java.util.Collection; import java.util.List; import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.remoting.RPCHook; public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { @@ -66,6 +67,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon * Consumption pattern,default is clustering */ private MessageModel messageModel = MessageModel.CLUSTERING; + + /** + * The switch for applying the rebalancing calculation task at the broker side + */ + private boolean rebalanceByBroker = false; /** * Message queue listener */ @@ -409,6 +415,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this.messageModel = messageModel; } + public boolean isRebalanceByBroker() { + return rebalanceByBroker; + } + + public void setRebalanceByBroker(boolean rebalanceByBroker) { + this.rebalanceByBroker = rebalanceByBroker; + } + public String getConsumerGroup() { return consumerGroup; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 0876a94e4c536b9691840373c5daa2a2c6aa9463..b9069dcad4877f4fb8b5e429ed4b34f649651cde 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -20,17 +20,18 @@ import java.util.HashSet; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -65,6 +66,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume * Consumption pattern,default is clustering */ private MessageModel messageModel = MessageModel.CLUSTERING; + /** + * The switch for applying the rebalancing calculation task at the broker side + */ + private boolean rebalanceByBroker = false; /** * Message queue listener */ @@ -245,6 +250,14 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this.messageModel = messageModel; } + public boolean isRebalanceByBroker() { + return rebalanceByBroker; + } + + public void setRebalanceByBroker(boolean rebalanceByBroker) { + this.rebalanceByBroker = rebalanceByBroker; + } + public MessageQueueListener getMessageQueueListener() { return messageQueueListener; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 9011117a79fc247800c3c9aa06d58c5c97bb01a8..743dda76dc2e43f88986ec7b9e077e5176195944 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -24,7 +24,6 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -33,6 +32,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -92,6 +93,19 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private MessageModel messageModel = MessageModel.CLUSTERING; + /** + * The switch for applying the rebalancing calculation task at the broker side. + *

+ * + * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with + * the same {@link #consumerGroup} would execute rebalancing calculations at the client side in default. The switch + * is responsible for shifting the rebalancing calculation task to the broker side. + *

+ * + * This field defaults to false. + */ + private boolean rebalanceByBroker = false; + /** * Consuming point on consumer booting. *

@@ -574,6 +588,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.messageModel = messageModel; } + public boolean isRebalanceByBroker() { + return rebalanceByBroker; + } + + public void setRebalanceByBroker(boolean rebalanceByBroker) { + this.rebalanceByBroker = rebalanceByBroker; + } + public int getPullBatchSize() { return pullBatchSize; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c64d7c568ec4c250fdee1c6c5749f65ca9d3d3fb..6ebb8ad116b282765f034ef658fb578248c1d0fc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -63,6 +63,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; @@ -86,6 +87,8 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader; +import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -901,6 +904,39 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } + public List getAllocateResultByStrategy(final String addr, final String group, final String clientId, + final String strategyName, final List mqAll, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { + AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader(); + requestHeader.setConsumerGroup(group); + requestHeader.setClientID(clientId); + requestHeader.setStrategyName(strategyName); + + AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody(); + requestBody.setMqAll(mqAll); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader); + request.setBody(requestBody.encode()); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + AllocateMessageQueueResponseBody body = AllocateMessageQueueRequestBody.decode(response.getBody(), + AllocateMessageQueueResponseBody.class); + return body.getAllocateResult(); + } + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index e3d60ffadde60bd9e44c1af36735b0584998f78a..5d06df4d02bda8bf18f85734c086d0a28d14f54f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -873,6 +873,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return this.defaultLitePullConsumer.getMessageModel(); } + @Override + public boolean rebalanceByBroker() { + return this.defaultLitePullConsumer.isRebalanceByBroker(); + } + @Override public ConsumeType consumeType() { return ConsumeType.CONSUME_ACTIVELY; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index afd72a08002c6c9124feb6260f011e9810407842..8bd1c311b231934b3225904d7a8476ff05c55f26 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -345,6 +345,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { return this.defaultMQPullConsumer.getMessageModel(); } + @Override + public boolean rebalanceByBroker() { + return this.defaultMQPullConsumer.isRebalanceByBroker(); + } + @Override public ConsumeType consumeType() { return ConsumeType.CONSUME_ACTIVELY; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ab585ea4c98e3592e6058a123d78cfc4f48bfe3d..afefd97125866b30b8ce619d96080e7074ffa943 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -990,6 +990,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { return this.defaultMQPushConsumer.getMessageModel(); } + @Override + public boolean rebalanceByBroker() { + return this.defaultMQPushConsumer.isRebalanceByBroker(); + } + @Override public ConsumeType consumeType() { return ConsumeType.CONSUME_PASSIVELY; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java index c2e8a1dfc49555c8147928ea9eb5a9143d0777c9..242820f0c55ad0a287be6793ac0ef414d4b23a9c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java @@ -32,6 +32,8 @@ public interface MQConsumerInner { MessageModel messageModel(); + boolean rebalanceByBroker(); + ConsumeType consumeType(); ConsumeFromWhere consumeFromWhere(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index b8972a92e8fdb3402b92c7c1b1a8015cb38a127b..a5a22fe5a5ffb9527285bb83a01f2d95a0aa662e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -27,18 +27,19 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants; +import org.apache.rocketmq.logging.InternalLogger; public abstract class RebalanceImpl { protected static final InternalLogger log = ClientLogger.getLog(); @@ -277,16 +278,30 @@ public abstract class RebalanceImpl { AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; - try { - allocateResult = strategy.allocate( + + if (isRebalanceByBroker()) { + allocateResult = this.mQClientFactory.getAllocateResult( + topic, this.consumerGroup, - this.mQClientFactory.getClientId(), - mqAll, - cidAll); - } catch (Throwable e) { - log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), - e); - return; + strategy.getName(), + mqAll); + } else { + if (strategy.getName().equals(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY)) { + log.error("AllocateMessageQueueStrategy is not supported while rebalanceByBroker=false. allocateMessageQueueStrategyName={}", + strategy.getName()); + return; + } + try { + allocateResult = strategy.allocate( + this.consumerGroup, + this.mQClientFactory.getClientId(), + mqAll, + cidAll); + } catch (Throwable e) { + log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), + e); + return; + } } Set allocateResultSet = new HashSet(); @@ -412,6 +427,8 @@ public abstract class RebalanceImpl { public abstract void dispatchPullRequest(final List pullRequestList); + public abstract boolean isRebalanceByBroker(); + public void removeProcessQueue(final MessageQueue mq) { ProcessQueue prev = this.processQueueTable.remove(mq); if (prev != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 9d1ea7492eea2ffc662defea11f4fe5d1bf84f38..31c3fdaf57c19ff55f7374ddcec4440922ffdcfe 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -18,11 +18,11 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.List; import java.util.Set; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -142,4 +142,9 @@ public class RebalanceLitePullImpl extends RebalanceImpl { public void dispatchPullRequest(List pullRequestList) { } + @Override + public boolean isRebalanceByBroker() { + return this.litePullConsumerImpl.rebalanceByBroker(); + } + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 9dd408c1d140ea4f7ce4aa31df5e5181c997b753..e9373f6f386549e4d035cf803c2a191d6c94d1b5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -18,9 +18,9 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.List; import java.util.Set; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -76,4 +76,9 @@ public class RebalancePullImpl extends RebalanceImpl { @Override public void dispatchPullRequest(List pullRequestList) { } + + @Override + public boolean isRebalanceByBroker() { + return this.defaultMQPullConsumerImpl.rebalanceByBroker(); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index e5166f35b59c003d4512f6f2e64965713ea8d051..8341480dc0df93cbd79685e3a88bbbe1729df95f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -19,11 +19,11 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -218,4 +218,9 @@ public class RebalancePushImpl extends RebalanceImpl { log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } } + + @Override + public boolean isRebalanceByBroker() { + return this.defaultMQPushConsumerImpl.rebalanceByBroker(); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index e937ce39bb8f0a60e68d413609df155da36dc195..aad513429c94d4888f7af412c4862a694e07e2cc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.factory; import java.io.UnsupportedEncodingException; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -442,7 +443,7 @@ public class MQClientInstance { } // may need to check one broker every cluster... // assume that the configs of every broker in cluster are the the same. - String addr = findBrokerAddrByTopic(subscriptionData.getTopic()); + String addr = findRandomBrokerAddrByTopic(subscriptionData.getTopic()); if (addr != null) { try { @@ -1069,10 +1070,10 @@ public class MQClientInstance { } public List findConsumerIdList(final String topic, final String group) { - String brokerAddr = this.findBrokerAddrByTopic(topic); + String brokerAddr = this.findRandomBrokerAddrByTopic(topic); if (null == brokerAddr) { this.updateTopicRouteInfoFromNameServer(topic); - brokerAddr = this.findBrokerAddrByTopic(topic); + brokerAddr = this.findRandomBrokerAddrByTopic(topic); } if (null != brokerAddr) { @@ -1086,7 +1087,7 @@ public class MQClientInstance { return null; } - public String findBrokerAddrByTopic(final String topic) { + public String findRandomBrokerAddrByTopic(final String topic) { TopicRouteData topicRouteData = this.topicRouteTable.get(topic); if (topicRouteData != null) { List brokers = topicRouteData.getBrokerDatas(); @@ -1100,6 +1101,24 @@ public class MQClientInstance { return null; } + public String findUniqueBrokerAddrByTopic(final String topic) { + TopicRouteData topicRouteData = this.topicRouteTable.get(topic); + if (topicRouteData != null) { + List brokers = topicRouteData.getBrokerDatas(); + if (!brokers.isEmpty()) { + Collections.sort(brokers, new Comparator() { + @Override public int compare(BrokerData o1, BrokerData o2) { + return o1.getBrokerName().compareTo(o2.getBrokerName()); + } + }); + BrokerData bd = brokers.get(0); + return bd.selectBrokerAddr(); + } + } + + return null; + } + public void resetOffset(String topic, String group, Map offsetTable) { DefaultMQPushConsumerImpl consumer = null; try { @@ -1161,6 +1180,26 @@ public class MQClientInstance { } } + public List getAllocateResult(final String topic, final String group, final String strategyName, + final List mqAll) { + String brokerAddr = this.findUniqueBrokerAddrByTopic(topic); + if (null == brokerAddr) { + this.updateTopicRouteInfoFromNameServer(topic); + brokerAddr = this.findUniqueBrokerAddrByTopic(topic); + } + + if (null != brokerAddr) { + try { + return this.mQClientAPIImpl.getAllocateResultByStrategy(brokerAddr, group, clientId, strategyName, + mqAll, 3000); + } catch (Exception e) { + log.warn("getAllocateResultByStrategy exception, {} {}", brokerAddr, group, e); + } + } + + return null; + } + public TopicRouteData getAnExistTopicRouteData(final String topic) { return this.topicRouteTable.get(topic); } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java index 0d394c382311fb2140aaa655443677d5b6a5902c..755ad905806bb644f5f4f21f1c0dd3dbfa4cf8dc 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java @@ -23,8 +23,10 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.rebalance.AllocateMachineRoomNearby; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java index 98ce7b6eb276ad38a8db86666792a175dca50d78..aa9cfba3cfb7f509979502b257b43bd21e6ec02c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -22,8 +22,9 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java index 796a3943087247c9e5fc75aa5b8fc1cc47c35fb0..68260c123526c8c1120b369650285ca2d4156e34 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -20,13 +20,13 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java similarity index 97% rename from client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java rename to common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java index c1f060406b54818b85b6bd767e6708fe49b5681b..e467609719c8a810c508d73ff5eec6616d4d771f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java +++ b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer; +package org.apache.rocketmq.common; import java.util.List; import org.apache.rocketmq.common.message.MessageQueue; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 75ceff38cfb9b8bbd086007be095012c0d076f8d..90642c2ed1c068bff7d208bea3c070179e52e6eb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -33,6 +33,8 @@ public class RequestCode { public static final int GET_TOPIC_NAME_LIST = 23; + public static final int ALLOCATE_MESSAGE_QUEUE = 24; + public static final int UPDATE_BROKER_CONFIG = 25; public static final int GET_BROKER_CONFIG = 26; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java index dc744448f6cf6999f1b8cb79860c8d916de88610..0cef0af9357d31321806c140dadd208cc2014707 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -80,4 +80,5 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; + public static final int ALLOCATE_MESSAGE_QUEUE_FAILED = 212; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java new file mode 100644 index 0000000000000000000000000000000000000000..3e051585af102d3f096701c53c4cc769b01584bc --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class AllocateMessageQueueRequestBody extends RemotingSerializable { + private List mqAll; + + public List getMqAll() { + return mqAll; + } + + public void setMqAll(List mqAll) { + this.mqAll = mqAll; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..4c0c11ef5c5ce16fa9e220f38423254d07a28920 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AllocateMessageQueueRequestHeader implements CommandCustomHeader { + @CFNotNull + private String consumerGroup; + @CFNotNull + private String clientID; + @CFNotNull + private String strategyName; + + @Override public void checkFields() throws RemotingCommandException { + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getClientID() { + return clientID; + } + + public void setClientID(String clientID) { + this.clientID = clientID; + } + + public String getStrategyName() { + return strategyName; + } + + public void setStrategyName(String strategyName) { + this.strategyName = strategyName; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java new file mode 100644 index 0000000000000000000000000000000000000000..a1d4f478c91ecd3bd51429a056ec91f13d7724a3 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class AllocateMessageQueueResponseBody extends RemotingSerializable { + private List allocateResult; + + public List getAllocateResult() { + return allocateResult; + } + + public void setAllocateResult(List allocateResult) { + this.allocateResult = allocateResult; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..7fc460b9924cdea9e4009e7e60c5c909ae2bed60 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AllocateMessageQueueResponseHeader implements CommandCustomHeader { + + @Override public void checkFields() throws RemotingCommandException { + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java similarity index 94% rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java index ec0f7f6489d621ff09747f9bb591827dab45fb76..4750ba21d9a34261234a9f6993436b6bf2e6c363 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java @@ -14,17 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer.rebalance; +package org.apache.rocketmq.common.rebalance; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; /** * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be @@ -35,7 +36,7 @@ import org.apache.rocketmq.logging.InternalLogger; * no alive consumer to monopolize them. */ public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy { - private final InternalLogger log = ClientLogger.getLog(); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy private final MachineRoomResolver machineRoomResolver; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java similarity index 88% rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java index 155e692ad0bef0d7e351b6cc1a73f707067bcf0b..525bc821099676c0cd1e1372bb8fabc49495ebee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java @@ -14,20 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer.rebalance; +package org.apache.rocketmq.common.rebalance; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; /** * Average Hashing queue algorithm */ public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { - private final InternalLogger log = ClientLogger.getLog(); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @Override public List allocate(String consumerGroup, String currentCID, List mqAll, diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java similarity index 87% rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java index fe78f0a6bbf8f3e698c1c3a52ac2b6307b6a8ace..4bdfa45c0d068f72756619610e5dd5fd7f1554fb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java @@ -14,20 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer.rebalance; +package org.apache.rocketmq.common.rebalance; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; /** * Cycle average Hashing queue algorithm */ public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { - private final InternalLogger log = ClientLogger.getLog(); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @Override public List allocate(String consumerGroup, String currentCID, List mqAll, diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java similarity index 92% rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java index e548803d0d0a2786c504c6db9be8a93bd430d27a..a5aaf663d41f7781a281467c67a1b247fe7a2067 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer.rebalance; +package org.apache.rocketmq.common.rebalance; import java.util.List; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java similarity index 95% rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java index 37568317cb016638ef5d56c23341051810ce8ec9..bac3dac48ab870103e111920b4283098c59060fb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer.rebalance; +package org.apache.rocketmq.common.rebalance; import java.util.ArrayList; import java.util.List; import java.util.Set; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; /** diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java similarity index 92% rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java index 65dcf799271f4e9e6c2fa21bcbcd6403e21e6e9d..8baf7c7c73dfa0b92405d52a4913a390deecc963 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java @@ -14,24 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.consumer.rebalance; +package org.apache.rocketmq.common.rebalance; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter; import org.apache.rocketmq.common.consistenthash.HashFunction; import org.apache.rocketmq.common.consistenthash.Node; -import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; /** * Consistent Hashing queue algorithm */ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy { - private final InternalLogger log = ClientLogger.getLog(); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private final int virtualNodeCnt; private final HashFunction customHashFunction; diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java new file mode 100644 index 0000000000000000000000000000000000000000..06a9abae914c4697907e70d1efa1ed5e74eec159 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.rebalance; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import org.apache.rocketmq.common.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + +public class AllocateMessageQueueSticky implements AllocateMessageQueueStrategy { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + + private final Map> messageQueueAllocation = new HashMap>(); + + private final List unassignedQueues = new ArrayList(); + + @Override + public List allocate(String consumerGroup, String currentCID, + List mqAll, final List cidAll) { + if (currentCID == null || currentCID.length() < 1) { + throw new IllegalArgumentException("currentCID is empty"); + } + if (mqAll == null || mqAll.isEmpty()) { + throw new IllegalArgumentException("mqAll is null or mqAll empty"); + } + if (cidAll == null || cidAll.isEmpty()) { + throw new IllegalArgumentException("cidAll is null or cidAll empty"); + } + + List result = new ArrayList(); + if (!cidAll.contains(currentCID)) { + log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", + consumerGroup, + currentCID, + cidAll); + return result; + } + + // Rebalance (or fresh assignment) needed + if (cidAll.size() != messageQueueAllocation.size() || mqAll.size() != getPrevMqAll().size()) { + // Update messageQueueAllocation + updateMessageQueueAllocation(mqAll, cidAll); + + // Sort consumers based on how many message queues are assigned to them + TreeSet sortedSubscriptions = new TreeSet(new ConsumerComparator(messageQueueAllocation)); + sortedSubscriptions.addAll(messageQueueAllocation.keySet()); + + // Assign unassignedQueues to consumers so that queues allocations are as balanced as possible + for (MessageQueue mq : unassignedQueues) { + String consumer = sortedSubscriptions.first(); + sortedSubscriptions.remove(consumer); + messageQueueAllocation.get(consumer).add(mq); + sortedSubscriptions.add(consumer); + } + unassignedQueues.clear(); + + // Reassignment until no message queue can be moved to improve the balance + Map> preBalanceAllocation = new HashMap>(messageQueueAllocation); + while (!isBalanced(sortedSubscriptions)) { + String leastSubscribedConsumer = sortedSubscriptions.first(); + String mostSubscribedConsumer = sortedSubscriptions.last(); + MessageQueue mqFromMostSubscribedConsumer = messageQueueAllocation.get(mostSubscribedConsumer).get(0); + messageQueueAllocation.get(leastSubscribedConsumer).add(mqFromMostSubscribedConsumer); + messageQueueAllocation.get(mostSubscribedConsumer).remove(mqFromMostSubscribedConsumer); + } + + // Make sure it is getting a more balanced allocation than before; otherwise, revert to previous allocation + if (getBalanceScore(messageQueueAllocation) >= getBalanceScore(preBalanceAllocation)) { + deepCopy(preBalanceAllocation, messageQueueAllocation); + } + } + + return messageQueueAllocation.get(currentCID); + } + + private void updateMessageQueueAllocation(List mqAll, List cidAll) { + // The current size of consumers is larger than before + if (cidAll.size() > messageQueueAllocation.size()) { + for (String cid : cidAll) { + if (!messageQueueAllocation.containsKey(cid)) { + messageQueueAllocation.put(cid, new ArrayList()); + } + } + } + + // The current size of consumers is smaller than before + if (cidAll.size() < messageQueueAllocation.size()) { + Iterator>> it = messageQueueAllocation.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> entry = it.next(); + if (!cidAll.contains(entry.getKey())) { + it.remove(); + } + } + } + + // The current size of message queues is larger than before + List prevMqAll = getPrevMqAll(); + if (mqAll.size() > prevMqAll.size()) { + for (MessageQueue mq : mqAll) { + if (!prevMqAll.contains(mq)) { + unassignedQueues.add(mq); + } + } + } + + // The current size of message queues is smaller than before + if (mqAll.size() < prevMqAll.size()) { + for (MessageQueue prevMq : prevMqAll) { + if (!isSameQueueIdExists(mqAll, prevMq.getQueueId())) { + for (List prevMqs : messageQueueAllocation.values()) { + prevMqs.remove(prevMq); + } + } + } + } + } + + private static class ConsumerComparator implements Comparator, Serializable { + private final Map> map; + + ConsumerComparator(Map> map) { + this.map = map; + } + + @Override + public int compare(String o1, String o2) { + int ret = map.get(o1).size() - map.get(o2).size(); + if (ret == 0) { + ret = o1.compareTo(o2); + } + return ret; + } + } + + private boolean isSameQueueIdExists(List mqAll, int prevMqId) { + for (MessageQueue mq : mqAll) { + if (mq.getQueueId() == prevMqId) { + return true; + } + } + return false; + } + + private boolean isBalanced(TreeSet sortedCurrentSubscriptions) { + int min = this.messageQueueAllocation.get(sortedCurrentSubscriptions.first()).size(); + int max = this.messageQueueAllocation.get(sortedCurrentSubscriptions.last()).size(); + // if minimum and maximum numbers of message queues allocated to consumers differ by at most 1 + return min >= max - 1; + } + + /** + * @return The balance score of the given allocation, which is the sum of assigned queues size difference of all + * consumer. A well balanced allocation with balance score of 0 (all consumers getting the same number of + * allocations). Lower balance score represents a more balanced allocation. + */ + private int getBalanceScore(Map> allocation) { + int score = 0; + + Map consumerAllocationSizes = new HashMap(); + for (Map.Entry> entry : allocation.entrySet()) { + consumerAllocationSizes.put(entry.getKey(), entry.getValue().size()); + } + + Iterator> it = consumerAllocationSizes.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + int consumerAllocationSize = entry.getValue(); + it.remove(); + for (Map.Entry otherEntry : consumerAllocationSizes.entrySet()) { + score += Math.abs(consumerAllocationSize - otherEntry.getValue()); + } + } + + return score; + } + + private void deepCopy(Map> source, Map> dest) { + dest.clear(); + for (Map.Entry> entry : source.entrySet()) { + dest.put(entry.getKey(), new ArrayList(entry.getValue())); + } + } + + private List getPrevMqAll() { + List prevMqAll = new ArrayList(); + for (List queues : messageQueueAllocation.values()) { + prevMqAll.addAll(queues); + } + return prevMqAll; + } + + @Override + public String getName() { + return "STICKY"; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..bbfefb320c5f413a0cecce1a35891b4ce180490e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.rebalance; + +public class AllocateMessageQueueStrategyConstants { + + public static final String ALLOCATE_MACHINE_ROOM_NEARBY = "MACHINE_ROOM_NEARBY"; + + public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY = "AVG"; + + public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE = "AVG_BY_CIRCLE"; + + public static final String ALLOCATE_MESSAGE_QUEUE_BY_CONFIG = "CONFIG"; + + public static final String ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM = "MACHINE_ROOM"; + + public static final String ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH = "CONSISTENT_HASH"; + + public static final String ALLOCATE_MESSAGE_QUEUE_STICKY = "STICKY"; +} diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java index 0c97cd332102eb9e838b6506862f2389bcafc04e..0e674efa9cd084774636c1c67e76ed2f531c4db9 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java @@ -29,7 +29,6 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; @@ -37,6 +36,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java index a9b9ab03472df7fe911956a244838fb9a662391d..5e5d446149b30e27d910209cdc93533f4f169697 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java @@ -23,10 +23,10 @@ import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;