提交 0b80f809 编写于 作者: Y yukon 提交者: dongeforever

[ROCKETMQ-294] Do flow control on the number and size dimensions when pull message

Author: yukon <yukon@apache.org>

Closes #171 from zhouxinyu/ROCKETMQ-294.
上级 bbd27c15
......@@ -166,10 +166,42 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
* Flow control threshold
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private int pullThresholdSizeForQueue = 100;
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
* <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
* {@code pullThresholdForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
*/
private int pullThresholdForTopic = -1;
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
* {@code pullThresholdSizeForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
/**
* Message pull Interval
*/
......@@ -407,6 +439,30 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.pullThresholdForQueue = pullThresholdForQueue;
}
public int getPullThresholdForTopic() {
return pullThresholdForTopic;
}
public void setPullThresholdForTopic(final int pullThresholdForTopic) {
this.pullThresholdForTopic = pullThresholdForTopic;
}
public int getPullThresholdSizeForQueue() {
return pullThresholdSizeForQueue;
}
public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) {
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
}
public int getPullThresholdSizeForTopic() {
return pullThresholdSizeForTopic;
}
public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {
this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
}
public Map<String, String> getSubscription() {
return subscription;
}
......
......@@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
private long flowControlTimes1 = 0;
private long flowControlTimes2 = 0;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
......@@ -219,13 +219,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return;
}
long size = processQueue.getMsgCount().get();
if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes1++ % 1000) == 0) {
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
......@@ -233,11 +245,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes2++ % 1000) == 0) {
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, flowControlTimes2);
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
......@@ -732,6 +744,34 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
null);
}
// pullThresholdForTopic
if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
throw new MQClientException(
"pullThresholdForTopic Out of range [1, 6553500]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
// pullThresholdSizeForQueue
if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
throw new MQClientException(
"pullThresholdSizeForQueue Out of range [1, 1024]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
// pullThresholdSizeForTopic
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
throw new MQClientException(
"pullThresholdSizeForTopic Out of range [1, 102400]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
// pullInterval
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
throw new MQClientException(
......
......@@ -45,6 +45,7 @@ public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
......@@ -129,6 +130,7 @@ public class ProcessQueue {
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
......@@ -189,6 +191,7 @@ public class ProcessQueue {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
......@@ -215,6 +218,10 @@ public class ProcessQueue {
return msgCount;
}
public AtomicLong getMsgSize() {
return msgSize;
}
public boolean isDropped() {
return dropped;
}
......@@ -250,7 +257,10 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
for (MessageExt msg : this.msgTreeMapTemp.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.msgTreeMapTemp.clear();
if (offset != null) {
return offset + 1;
......@@ -334,6 +344,7 @@ public class ProcessQueue {
this.msgTreeMap.clear();
this.msgTreeMapTemp.clear();
this.msgCount.set(0);
this.msgSize.set(0);
this.queueOffsetMax = 0L;
} finally {
this.lockTreeMap.writeLock().unlock();
......@@ -387,6 +398,7 @@ public class ProcessQueue {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
}
if (!this.msgTreeMapTemp.isEmpty()) {
......
......@@ -57,6 +57,26 @@ public class RebalancePushImpl extends RebalanceImpl {
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
if (pullThresholdForTopic != -1) {
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
log.info("The pullThresholdForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
}
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
if (pullThresholdSizeForTopic != -1) {
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
log.info("The pullThresholdSizeForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
}
}
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
......
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
......@@ -62,6 +63,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -207,6 +209,59 @@ public class DefaultMQPushConsumerTest {
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
@Test
public void testCheckConfig() {
DefaultMQPushConsumer pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdForQueue(65535 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdForQueue Out of range [1, 65535]");
}
pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdForTopic(65535 * 100 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdForTopic Out of range [1, 6553500]");
}
pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdSizeForQueue(1024 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out of range [1, 1024]");
}
pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out of range [1, 102400]");
}
}
private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
return pushConsumer;
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class ProcessQueueTest {
@Test
public void testCachedMessageCount() {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList());
assertThat(pq.getMsgCount().get()).isEqualTo(100);
pq.takeMessags(10);
pq.commit();
assertThat(pq.getMsgCount().get()).isEqualTo(90);
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
assertThat(pq.getMsgCount().get()).isEqualTo(89);
}
@Test
public void testCachedMessageSize() {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList());
assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123);
pq.takeMessags(10);
pq.commit();
assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123);
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
}
@Test
public void testFillProcessQueueInfo() {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList(102400));
ProcessQueueInfo processQueueInfo = new ProcessQueueInfo();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12);
pq.takeMessags(10000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10);
pq.takeMessags(10000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9);
pq.takeMessags(80000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
}
private List<MessageExt> createMessageList() {
return createMessageList(100);
}
private List<MessageExt> createMessageList(int count) {
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
for (int i = 0; i < count; i++) {
MessageExt messageExt = new MessageExt();
messageExt.setQueueOffset(i);
messageExt.setBody(new byte[123]);
messageExtList.add(messageExt);
}
return messageExtList;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class RebalancePushImplTest {
@Spy
private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null);
@Mock
private MQClientInstance mqClientInstance;
@Mock
private OffsetStore offsetStore;
private String consumerGroup = "CID_RebalancePushImplTest";
private String topic = "TopicA";
@Test
public void testMessageQueueChanged_CountThreshold() {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
// Just set pullThresholdForQueue
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(1024);
// Set pullThresholdForTopic
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(512);
// Change message queue allocate result
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(341);
}
private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set<MessageQueue> allocateResultSet) {
rebalancePush.topicSubscribeInfoTable.put(topic, allocateResultSet);
rebalancePush.doRebalance(false);
rebalancePush.messageQueueChanged(topic, allocateResultSet, allocateResultSet);
}
private void init(final RebalancePushImpl rebalancePush) {
rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());
rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup));
when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
doAnswer(new Answer() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
}
@Test
public void testMessageQueueChanged_SizeThreshold() {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
// Just set pullThresholdSizeForQueue
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(1024);
// Set pullThresholdSizeForTopic
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(512);
// Change message queue allocate result
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(341);
}
@Test
public void testMessageQueueChanged_ConsumerRuntimeInfo() throws MQClientException {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("-1");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("-1");
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("512");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("512");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
// Change message queue allocate result
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("341");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("341");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
}
}
\ No newline at end of file
......@@ -25,6 +25,7 @@ public class ProcessQueueInfo {
private long cachedMsgMinOffset;
private long cachedMsgMaxOffset;
private int cachedMsgCount;
private int cachedMsgSizeInMiB;
private long transactionMsgMinOffset;
private long transactionMsgMaxOffset;
......@@ -142,16 +143,24 @@ public class ProcessQueueInfo {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
public int getCachedMsgSizeInMiB() {
return cachedMsgSizeInMiB;
}
public void setCachedMsgSizeInMiB(final int cachedMsgSizeInMiB) {
this.cachedMsgSizeInMiB = cachedMsgSizeInMiB;
}
@Override
public String toString() {
return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset="
+ cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount="
+ cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset
+ ", cachedMsgCount=" + cachedMsgCount + ", cachedMsgSizeInMiB=" + cachedMsgSizeInMiB
+ ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
+ transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
+ ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+ droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
+ ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册