提交 947526b7 编写于 作者: M Mark_Yang 提交者: von gosling

[ROCKETMQ-266] Add a specific Exception message for comparing...

[ROCKETMQ-266] Add a specific Exception message for comparing consumerThreadMax and consumerThreadMin (#147)

* Can’t start consumer with a small consumerThreadMax number

* Can’t start consumer with a small consumerThreadMax number

* test case for ROCKETMQ-266

* Can’t start consumer with a small consumerThreadMax number

* Can’t start consumer with a small consumerThreadMax number

* fix merge conflict

* update test case for ROCKETMQ-266

* for ROCKETMQ-266

* Trigger rebuild

* Trigger rebuild

* Trigger rebuild
上级 390f7358
...@@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// consumeThreadMin // consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
|| this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
throw new MQClientException( throw new MQClientException(
"consumeThreadMin Out of range [1, 1000]" "consumeThreadMin Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
...@@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
null); null);
} }
// consumeThreadMin can't be larger than consumeThreadMax
if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
throw new MQClientException(
"consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "
+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",
null);
}
// consumeConcurrentlyMaxSpan // consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class DefaultMQPushConsumerImplTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void checkConfigTest() throws MQClientException {
//test type
thrown.expect(MQClientException.class);
//test message
thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(9);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(" Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null);
defaultMQPushConsumerImpl.start();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册