diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index e32edc9498011e2c62095b84db6583bc08137f60..d51030a159240cd9b34aa4dfd9ca438b013cf577 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -166,10 +166,42 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
- * Flow control threshold
+ * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
+ * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
+ /**
+ * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
+ * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+ *
+ *
+ * The size of a message only measured by message body, so it's not accurate
+ */
+ private int pullThresholdSizeForQueue = 100;
+
+ /**
+ * Flow control threshold on topic level, default value is -1(Unlimited)
+ *
+ * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
+ * {@code pullThresholdForTopic} if it is't unlimited
+ *
+ * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
+ * then pullThresholdForQueue will be set to 100
+ */
+ private int pullThresholdForTopic = -1;
+
+ /**
+ * Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
+ *
+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
+ * {@code pullThresholdSizeForTopic} if it is't unlimited
+ *
+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
+ * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+ */
+ private int pullThresholdSizeForTopic = -1;
+
/**
* Message pull Interval
*/
@@ -407,6 +439,30 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.pullThresholdForQueue = pullThresholdForQueue;
}
+ public int getPullThresholdForTopic() {
+ return pullThresholdForTopic;
+ }
+
+ public void setPullThresholdForTopic(final int pullThresholdForTopic) {
+ this.pullThresholdForTopic = pullThresholdForTopic;
+ }
+
+ public int getPullThresholdSizeForQueue() {
+ return pullThresholdSizeForQueue;
+ }
+
+ public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) {
+ this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+ }
+
+ public int getPullThresholdSizeForTopic() {
+ return pullThresholdSizeForTopic;
+ }
+
+ public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {
+ this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
+ }
+
public Map getSubscription() {
return subscription;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4ba6216b7e6931074a7cefec6ff29624b5aeadbb..72bc953f5664018b3b9aed1f482346846f4eaba8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
- private long flowControlTimes1 = 0;
- private long flowControlTimes2 = 0;
+ private long queueFlowControlTimes = 0;
+ private long queueMaxSpanFlowControlTimes = 0;
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
@@ -219,13 +219,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return;
}
- long size = processQueue.getMsgCount().get();
- if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+ long cachedMessageCount = processQueue.getMsgCount().get();
+ long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+
+ if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
+ this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
+ }
+ return;
+ }
+
+ if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((flowControlTimes1++ % 1000) == 0) {
+ if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+ "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
+ this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
@@ -233,11 +245,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((flowControlTimes2++ % 1000) == 0) {
+ if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
- pullRequest, flowControlTimes2);
+ pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
@@ -732,6 +744,34 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
null);
}
+ // pullThresholdForTopic
+ if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
+ if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
+ throw new MQClientException(
+ "pullThresholdForTopic Out of range [1, 6553500]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
+ // pullThresholdSizeForQueue
+ if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
+ throw new MQClientException(
+ "pullThresholdSizeForQueue Out of range [1, 1024]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
+ // pullThresholdSizeForTopic
+ if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
+ throw new MQClientException(
+ "pullThresholdSizeForTopic Out of range [1, 102400]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
// pullInterval
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
throw new MQClientException(
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 38b80738c60122bf596c653ac2c6accda038a7ec..e21dbc8f18a0e0e3905a282d510ce856da6d35cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -45,6 +45,7 @@ public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap msgTreeMap = new TreeMap();
private final AtomicLong msgCount = new AtomicLong();
+ private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
private final TreeMap msgTreeMapTemp = new TreeMap();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
@@ -129,6 +130,7 @@ public class ProcessQueue {
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
+ msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
@@ -189,6 +191,7 @@ public class ProcessQueue {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
+ msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
@@ -215,6 +218,10 @@ public class ProcessQueue {
return msgCount;
}
+ public AtomicLong getMsgSize() {
+ return msgSize;
+ }
+
public boolean isDropped() {
return dropped;
}
@@ -250,7 +257,10 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
- msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+ msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
+ for (MessageExt msg : this.msgTreeMapTemp.values()) {
+ msgSize.addAndGet(0 - msg.getBody().length);
+ }
this.msgTreeMapTemp.clear();
if (offset != null) {
return offset + 1;
@@ -334,6 +344,7 @@ public class ProcessQueue {
this.msgTreeMap.clear();
this.msgTreeMapTemp.clear();
this.msgCount.set(0);
+ this.msgSize.set(0);
this.queueOffsetMax = 0L;
} finally {
this.lockTreeMap.writeLock().unlock();
@@ -387,6 +398,7 @@ public class ProcessQueue {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
+ info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
}
if (!this.msgTreeMapTemp.isEmpty()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 39e025172d4f8333880de09b8d74596b04fc69c3..e5166f35b59c003d4512f6f2e64965713ea8d051 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -57,6 +57,26 @@ public class RebalancePushImpl extends RebalanceImpl {
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
+
+ int currentQueueCount = this.processQueueTable.size();
+ if (currentQueueCount != 0) {
+ int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
+ if (pullThresholdForTopic != -1) {
+ int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
+ log.info("The pullThresholdForQueue is changed from {} to {}",
+ this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
+ this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
+ }
+
+ int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
+ if (pullThresholdSizeForTopic != -1) {
+ int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
+ log.info("The pullThresholdSizeForQueue is changed from {} to {}",
+ this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
+ this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
+ }
+ }
+
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 94b3f0f9b6588cf4977a44bf23a51c2c6bf6cc48..b21edc9197cb38ff04461a61726b49c29e9ccdc7 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
@@ -62,6 +63,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -207,6 +209,59 @@ public class DefaultMQPushConsumerTest {
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
+ @Test
+ public void testCheckConfig() {
+ DefaultMQPushConsumer pushConsumer = createPushConsumer();
+
+ pushConsumer.setPullThresholdForQueue(65535 + 1);
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdForQueue Out of range [1, 65535]");
+ }
+
+ pushConsumer = createPushConsumer();
+ pushConsumer.setPullThresholdForTopic(65535 * 100 + 1);
+
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdForTopic Out of range [1, 6553500]");
+ }
+
+ pushConsumer = createPushConsumer();
+ pushConsumer.setPullThresholdSizeForQueue(1024 + 1);
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out of range [1, 1024]");
+ }
+
+ pushConsumer = createPushConsumer();
+ pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1);
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out of range [1, 102400]");
+ }
+ }
+
+ private DefaultMQPushConsumer createPushConsumer() {
+ DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+ pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+ ConsumeConcurrentlyContext context) {
+ return null;
+ }
+ });
+ return pushConsumer;
+ }
+
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..d6a6dcf72f65a5dd476d53359d0b35feb1bc74c1
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProcessQueueTest {
+
+ @Test
+ public void testCachedMessageCount() {
+ ProcessQueue pq = new ProcessQueue();
+
+ pq.putMessage(createMessageList());
+
+ assertThat(pq.getMsgCount().get()).isEqualTo(100);
+
+ pq.takeMessags(10);
+ pq.commit();
+
+ assertThat(pq.getMsgCount().get()).isEqualTo(90);
+
+ pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
+ assertThat(pq.getMsgCount().get()).isEqualTo(89);
+ }
+
+ @Test
+ public void testCachedMessageSize() {
+ ProcessQueue pq = new ProcessQueue();
+
+ pq.putMessage(createMessageList());
+
+ assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123);
+
+ pq.takeMessags(10);
+ pq.commit();
+
+ assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123);
+
+ pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
+ assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
+ }
+
+ @Test
+ public void testFillProcessQueueInfo() {
+ ProcessQueue pq = new ProcessQueue();
+ pq.putMessage(createMessageList(102400));
+
+ ProcessQueueInfo processQueueInfo = new ProcessQueueInfo();
+ pq.fillProcessQueueInfo(processQueueInfo);
+
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12);
+
+ pq.takeMessags(10000);
+ pq.commit();
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10);
+
+ pq.takeMessags(10000);
+ pq.commit();
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9);
+
+ pq.takeMessags(80000);
+ pq.commit();
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
+ }
+
+ private List createMessageList() {
+ return createMessageList(100);
+ }
+
+ private List createMessageList(int count) {
+ List messageExtList = new ArrayList();
+ for (int i = 0; i < count; i++) {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setQueueOffset(i);
+ messageExt.setBody(new byte[123]);
+ messageExtList.add(messageExt);
+ }
+ return messageExtList;
+ }
+}
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..796a3943087247c9e5fc75aa5b8fc1cc47c35fb0
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RebalancePushImplTest {
+ @Spy
+ private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null);
+ @Mock
+ private MQClientInstance mqClientInstance;
+ @Mock
+ private OffsetStore offsetStore;
+ private String consumerGroup = "CID_RebalancePushImplTest";
+ private String topic = "TopicA";
+
+ @Test
+ public void testMessageQueueChanged_CountThreshold() {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
+ init(rebalancePush);
+
+ // Just set pullThresholdForQueue
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
+ Set allocateResultSet = new HashSet();
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(1024);
+
+ // Set pullThresholdForTopic
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(512);
+
+ // Change message queue allocate result
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(341);
+ }
+
+ private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set allocateResultSet) {
+ rebalancePush.topicSubscribeInfoTable.put(topic, allocateResultSet);
+ rebalancePush.doRebalance(false);
+ rebalancePush.messageQueueChanged(topic, allocateResultSet, allocateResultSet);
+ }
+
+ private void init(final RebalancePushImpl rebalancePush) {
+ rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());
+
+ rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
+
+ when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup));
+ when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
+ when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ return null;
+ }
+ }).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
+ }
+
+ @Test
+ public void testMessageQueueChanged_SizeThreshold() {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
+ init(rebalancePush);
+
+ // Just set pullThresholdSizeForQueue
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
+ Set allocateResultSet = new HashSet();
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(1024);
+
+ // Set pullThresholdSizeForTopic
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(512);
+
+ // Change message queue allocate result
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(341);
+ }
+
+ @Test
+ public void testMessageQueueChanged_ConsumerRuntimeInfo() throws MQClientException {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
+ init(rebalancePush);
+
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
+ Set allocateResultSet = new HashSet();
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
+ defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("1024");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("1024");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("-1");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("-1");
+
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
+ defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("512");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("512");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
+
+ // Change message queue allocate result
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("341");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("341");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
+ assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
+ }
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
index e2e99430cbff173c070ba183e741edd8f04e6e1c..6b220b812168e56b505ffc340adf0965081e52ef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
@@ -25,6 +25,7 @@ public class ProcessQueueInfo {
private long cachedMsgMinOffset;
private long cachedMsgMaxOffset;
private int cachedMsgCount;
+ private int cachedMsgSizeInMiB;
private long transactionMsgMinOffset;
private long transactionMsgMaxOffset;
@@ -142,16 +143,24 @@ public class ProcessQueueInfo {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
+ public int getCachedMsgSizeInMiB() {
+ return cachedMsgSizeInMiB;
+ }
+
+ public void setCachedMsgSizeInMiB(final int cachedMsgSizeInMiB) {
+ this.cachedMsgSizeInMiB = cachedMsgSizeInMiB;
+ }
+
@Override
public String toString() {
return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset="
- + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount="
- + cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset
+ + ", cachedMsgCount=" + cachedMsgCount + ", cachedMsgSizeInMiB=" + cachedMsgSizeInMiB
+ + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
+ transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
+ ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+ droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
+ ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
-
}
}