From 947526b7ce021b5cb9510a8197590ee33a90b938 Mon Sep 17 00:00:00 2001 From: Mark_Yang Date: Wed, 13 Dec 2017 05:35:52 -0600 Subject: [PATCH] [ROCKETMQ-266] Add a specific Exception message for comparing consumerThreadMax and consumerThreadMin (#147) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Can’t start consumer with a small consumerThreadMax number * Can’t start consumer with a small consumerThreadMax number * test case for ROCKETMQ-266 * Can’t start consumer with a small consumerThreadMax number * Can’t start consumer with a small consumerThreadMax number * fix merge conflict * update test case for ROCKETMQ-266 * for ROCKETMQ-266 * Trigger rebuild * Trigger rebuild * Trigger rebuild --- .../consumer/DefaultMQPushConsumerImpl.java | 11 +++- .../DefaultMQPushConsumerImplTest.java | 61 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 72bc953f..f560376c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { // consumeThreadMin if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 - || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 - || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { + || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { throw new MQClientException( "consumeThreadMin Out of range [1, 1000]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), @@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { null); } + // consumeThreadMin can't be larger than consumeThreadMax + if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { + throw new MQClientException( + "consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") " + + "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")", + null); + } + // consumeConcurrentlyMaxSpan if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java new file mode 100644 index 00000000..d4f58123 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.impl.consumer; + +import java.util.List; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class DefaultMQPushConsumerImplTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void checkConfigTest() throws MQClientException { + + //test type + thrown.expect(MQClientException.class); + + //test message + thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)"); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group"); + + consumer.setConsumeThreadMin(10); + consumer.setConsumeThreadMax(9); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + System.out.println(" Receive New Messages: " + msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null); + defaultMQPushConsumerImpl.start(); + } +} -- GitLab