From 0b80f8092ed0f3fdae7ae8c3212a4286b7f24cfc Mon Sep 17 00:00:00 2001 From: yukon Date: Wed, 18 Oct 2017 10:42:04 +0800 Subject: [PATCH] [ROCKETMQ-294] Do flow control on the number and size dimensions when pull message Author: yukon Closes #171 from zhouxinyu/ROCKETMQ-294. --- .../consumer/DefaultMQPushConsumer.java | 58 ++++++- .../consumer/DefaultMQPushConsumerImpl.java | 58 ++++++- .../client/impl/consumer/ProcessQueue.java | 14 +- .../impl/consumer/RebalancePushImpl.java | 20 +++ .../consumer/DefaultMQPushConsumerTest.java | 55 ++++++ .../impl/consumer/ProcessQueueTest.java | 107 ++++++++++++ .../impl/consumer/RebalancePushImplTest.java | 163 ++++++++++++++++++ .../protocol/body/ProcessQueueInfo.java | 15 +- 8 files changed, 476 insertions(+), 14 deletions(-) create mode 100644 client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java create mode 100644 client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index e32edc94..d51030a1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -166,10 +166,42 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume private int consumeConcurrentlyMaxSpan = 2000; /** - * Flow control threshold + * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, + * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit */ private int pullThresholdForQueue = 1000; + /** + * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, + * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit + * + *

+ * The size of a message only measured by message body, so it's not accurate + */ + private int pullThresholdSizeForQueue = 100; + + /** + * Flow control threshold on topic level, default value is -1(Unlimited) + *

+ * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on + * {@code pullThresholdForTopic} if it is't unlimited + *

+ * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, + * then pullThresholdForQueue will be set to 100 + */ + private int pullThresholdForTopic = -1; + + /** + * Limit the cached message size on topic level, default value is -1 MiB(Unlimited) + *

+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on + * {@code pullThresholdSizeForTopic} if it is't unlimited + *

+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are + * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB + */ + private int pullThresholdSizeForTopic = -1; + /** * Message pull Interval */ @@ -407,6 +439,30 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.pullThresholdForQueue = pullThresholdForQueue; } + public int getPullThresholdForTopic() { + return pullThresholdForTopic; + } + + public void setPullThresholdForTopic(final int pullThresholdForTopic) { + this.pullThresholdForTopic = pullThresholdForTopic; + } + + public int getPullThresholdSizeForQueue() { + return pullThresholdSizeForQueue; + } + + public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) { + this.pullThresholdSizeForQueue = pullThresholdSizeForQueue; + } + + public int getPullThresholdSizeForTopic() { + return pullThresholdSizeForTopic; + } + + public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) { + this.pullThresholdSizeForTopic = pullThresholdSizeForTopic; + } + public Map getSubscription() { return subscription; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 4ba6216b..72bc953f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private MessageListener messageListenerInner; private OffsetStore offsetStore; private ConsumeMessageService consumeMessageService; - private long flowControlTimes1 = 0; - private long flowControlTimes2 = 0; + private long queueFlowControlTimes = 0; + private long queueMaxSpanFlowControlTimes = 0; public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this.defaultMQPushConsumer = defaultMQPushConsumer; @@ -219,13 +219,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { return; } - long size = processQueue.getMsgCount().get(); - if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) { + long cachedMessageCount = processQueue.getMsgCount().get(); + long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); + + if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); + } + return; + } + + if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); - if ((flowControlTimes1++ % 1000) == 0) { + if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( - "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", - processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); + "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } @@ -233,11 +245,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); - if ((flowControlTimes2++ % 1000) == 0) { + if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), - pullRequest, flowControlTimes2); + pullRequest, queueMaxSpanFlowControlTimes); } return; } @@ -732,6 +744,34 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { null); } + // pullThresholdForTopic + if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) { + if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) { + throw new MQClientException( + "pullThresholdForTopic Out of range [1, 6553500]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + // pullThresholdSizeForQueue + if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) { + throw new MQClientException( + "pullThresholdSizeForQueue Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) { + // pullThresholdSizeForTopic + if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) { + throw new MQClientException( + "pullThresholdSizeForTopic Out of range [1, 102400]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + // pullInterval if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) { throw new MQClientException( diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 38b80738..e21dbc8f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -45,6 +45,7 @@ public class ProcessQueue { private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); private final TreeMap msgTreeMap = new TreeMap(); private final AtomicLong msgCount = new AtomicLong(); + private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); private final TreeMap msgTreeMapTemp = new TreeMap(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); @@ -129,6 +130,7 @@ public class ProcessQueue { if (null == old) { validMsgCnt++; this.queueOffsetMax = msg.getQueueOffset(); + msgSize.addAndGet(msg.getBody().length); } } msgCount.addAndGet(validMsgCnt); @@ -189,6 +191,7 @@ public class ProcessQueue { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; + msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); @@ -215,6 +218,10 @@ public class ProcessQueue { return msgCount; } + public AtomicLong getMsgSize() { + return msgSize; + } + public boolean isDropped() { return dropped; } @@ -250,7 +257,10 @@ public class ProcessQueue { this.lockTreeMap.writeLock().lockInterruptibly(); try { Long offset = this.msgTreeMapTemp.lastKey(); - msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1)); + msgCount.addAndGet(0 - this.msgTreeMapTemp.size()); + for (MessageExt msg : this.msgTreeMapTemp.values()) { + msgSize.addAndGet(0 - msg.getBody().length); + } this.msgTreeMapTemp.clear(); if (offset != null) { return offset + 1; @@ -334,6 +344,7 @@ public class ProcessQueue { this.msgTreeMap.clear(); this.msgTreeMapTemp.clear(); this.msgCount.set(0); + this.msgSize.set(0); this.queueOffsetMax = 0L; } finally { this.lockTreeMap.writeLock().unlock(); @@ -387,6 +398,7 @@ public class ProcessQueue { info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); info.setCachedMsgCount(this.msgTreeMap.size()); + info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); } if (!this.msgTreeMapTemp.isEmpty()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 39e02517..e5166f35 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -57,6 +57,26 @@ public class RebalancePushImpl extends RebalanceImpl { long newVersion = System.currentTimeMillis(); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); + + int currentQueueCount = this.processQueueTable.size(); + if (currentQueueCount != 0) { + int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); + if (pullThresholdForTopic != -1) { + int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount); + log.info("The pullThresholdForQueue is changed from {} to {}", + this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal); + this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal); + } + + int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic(); + if (pullThresholdSizeForTopic != -1) { + int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount); + log.info("The pullThresholdSizeForQueue is changed from {} to {}", + this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal); + this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal); + } + } + // notify broker this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 94b3f0f9..b21edc91 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; @@ -62,6 +63,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -207,6 +209,59 @@ public class DefaultMQPushConsumerTest { assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); } + @Test + public void testCheckConfig() { + DefaultMQPushConsumer pushConsumer = createPushConsumer(); + + pushConsumer.setPullThresholdForQueue(65535 + 1); + try { + pushConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("pullThresholdForQueue Out of range [1, 65535]"); + } + + pushConsumer = createPushConsumer(); + pushConsumer.setPullThresholdForTopic(65535 * 100 + 1); + + try { + pushConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("pullThresholdForTopic Out of range [1, 6553500]"); + } + + pushConsumer = createPushConsumer(); + pushConsumer.setPullThresholdSizeForQueue(1024 + 1); + try { + pushConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out of range [1, 1024]"); + } + + pushConsumer = createPushConsumer(); + pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1); + try { + pushConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out of range [1, 102400]"); + } + } + + private DefaultMQPushConsumer createPushConsumer() { + DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); + pushConsumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + return null; + } + }); + return pushConsumer; + } + private PullRequest createPullRequest() { PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java new file mode 100644 index 00000000..d6a6dcf7 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class ProcessQueueTest { + + @Test + public void testCachedMessageCount() { + ProcessQueue pq = new ProcessQueue(); + + pq.putMessage(createMessageList()); + + assertThat(pq.getMsgCount().get()).isEqualTo(100); + + pq.takeMessags(10); + pq.commit(); + + assertThat(pq.getMsgCount().get()).isEqualTo(90); + + pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue())); + assertThat(pq.getMsgCount().get()).isEqualTo(89); + } + + @Test + public void testCachedMessageSize() { + ProcessQueue pq = new ProcessQueue(); + + pq.putMessage(createMessageList()); + + assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123); + + pq.takeMessags(10); + pq.commit(); + + assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123); + + pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue())); + assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123); + } + + @Test + public void testFillProcessQueueInfo() { + ProcessQueue pq = new ProcessQueue(); + pq.putMessage(createMessageList(102400)); + + ProcessQueueInfo processQueueInfo = new ProcessQueueInfo(); + pq.fillProcessQueueInfo(processQueueInfo); + + assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12); + + pq.takeMessags(10000); + pq.commit(); + pq.fillProcessQueueInfo(processQueueInfo); + assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10); + + pq.takeMessags(10000); + pq.commit(); + pq.fillProcessQueueInfo(processQueueInfo); + assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9); + + pq.takeMessags(80000); + pq.commit(); + pq.fillProcessQueueInfo(processQueueInfo); + assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0); + } + + private List createMessageList() { + return createMessageList(100); + } + + private List createMessageList(int count) { + List messageExtList = new ArrayList(); + for (int i = 0; i < count; i++) { + MessageExt messageExt = new MessageExt(); + messageExt.setQueueOffset(i); + messageExt.setBody(new byte[123]); + messageExtList.add(messageExt); + } + return messageExtList; + } +} \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java new file mode 100644 index 00000000..796a3943 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RebalancePushImplTest { + @Spy + private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null); + @Mock + private MQClientInstance mqClientInstance; + @Mock + private OffsetStore offsetStore; + private String consumerGroup = "CID_RebalancePushImplTest"; + private String topic = "TopicA"; + + @Test + public void testMessageQueueChanged_CountThreshold() { + RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING, + new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer); + init(rebalancePush); + + // Just set pullThresholdForQueue + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024); + Set allocateResultSet = new HashSet(); + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0)); + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1)); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(1024); + + // Set pullThresholdForTopic + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(512); + + // Change message queue allocate result + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2)); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(341); + } + + private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set allocateResultSet) { + rebalancePush.topicSubscribeInfoTable.put(topic, allocateResultSet); + rebalancePush.doRebalance(false); + rebalancePush.messageQueueChanged(topic, allocateResultSet, allocateResultSet); + } + + private void init(final RebalancePushImpl rebalancePush) { + rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData()); + + rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData()); + + when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup)); + when(mqClientInstance.getClientId()).thenReturn(consumerGroup); + when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore); + + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + return null; + } + }).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class)); + } + + @Test + public void testMessageQueueChanged_SizeThreshold() { + RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING, + new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer); + init(rebalancePush); + + // Just set pullThresholdSizeForQueue + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024); + Set allocateResultSet = new HashSet(); + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0)); + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1)); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(1024); + + // Set pullThresholdSizeForTopic + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(512); + + // Change message queue allocate result + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2)); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(341); + } + + @Test + public void testMessageQueueChanged_ConsumerRuntimeInfo() throws MQClientException { + RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING, + new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer); + init(rebalancePush); + + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024); + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024); + Set allocateResultSet = new HashSet(); + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0)); + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1)); + doRebalanceForcibly(rebalancePush, allocateResultSet); + + defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null)); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("1024"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("1024"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("-1"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("-1"); + + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024); + defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("512"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("512"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024"); + + // Change message queue allocate result + allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2)); + doRebalanceForcibly(rebalancePush, allocateResultSet); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("341"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("341"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024"); + assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024"); + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java index e2e99430..6b220b81 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java @@ -25,6 +25,7 @@ public class ProcessQueueInfo { private long cachedMsgMinOffset; private long cachedMsgMaxOffset; private int cachedMsgCount; + private int cachedMsgSizeInMiB; private long transactionMsgMinOffset; private long transactionMsgMaxOffset; @@ -142,16 +143,24 @@ public class ProcessQueueInfo { this.lastConsumeTimestamp = lastConsumeTimestamp; } + public int getCachedMsgSizeInMiB() { + return cachedMsgSizeInMiB; + } + + public void setCachedMsgSizeInMiB(final int cachedMsgSizeInMiB) { + this.cachedMsgSizeInMiB = cachedMsgSizeInMiB; + } + @Override public String toString() { return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset=" - + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount=" - + cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset + + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + + ", cachedMsgCount=" + cachedMsgCount + ", cachedMsgSizeInMiB=" + cachedMsgSizeInMiB + + ", transactionMsgMinOffset=" + transactionMsgMinOffset + ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount=" + transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes + ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped=" + droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp) + ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]"; - } } -- GitLab