diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 6ad0fc308ecfbf11951a4f56ac778c1f58a289ec..9011117a79fc247800c3c9aa06d58c5c97bb01a8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -254,6 +254,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private long consumeTimeout = 15; + /** + * Maximum time to await message consuming when shutdown consumer, 0 indicates no await. + */ + private long awaitTerminationMillisWhenShutdown = 0; + /** * Interface of asynchronous transfer data */ @@ -705,7 +710,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Override public void shutdown() { - this.defaultMQPushConsumerImpl.shutdown(); + this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown); if (null != traceDispatcher) { traceDispatcher.shutdown(); } @@ -886,6 +891,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.consumeTimeout = consumeTimeout; } + public long getAwaitTerminationMillisWhenShutdown() { + return awaitTerminationMillisWhenShutdown; + } + + public void setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) { + this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown; + } + public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 258e4dbf8777fd6c5d1f0aa0cb947da332c6064f..b37f8a635983573c28d42a6708e0ed1ffee91b92 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -92,9 +93,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } - public void shutdown() { + public void shutdown(long awaitTerminateMillis) { this.scheduledExecutorService.shutdown(); - this.consumeExecutor.shutdown(); + ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); this.cleanExpireMsgExecutors.shutdown(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index f65567b685d723a5b65b465f0a93c363c985d450..0929916062233f4ae776e9318835e30de27a7e01 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -96,10 +97,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } } - public void shutdown() { + public void shutdown(long awaitTerminateMillis) { this.stopped = true; this.scheduledExecutorService.shutdown(); - this.consumeExecutor.shutdown(); + ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { this.unlockAllMQ(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java index 0f6f3bb38af6d22e02044bdaa738d2a8d45c2e8c..5078c97883502f198d1d2f395b42b0f4ee6262bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; public interface ConsumeMessageService { void start(); - void shutdown(); + void shutdown(long awaitTerminateMillis); void updateCorePoolSize(int corePoolSize); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 25a81a0e755e60e968ae281d4390cff5b77edcc7..ab585ea4c98e3592e6058a123d78cfc4f48bfe3d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -546,12 +546,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public synchronized void shutdown() { + public void shutdown() { + shutdown(0); + } + + public synchronized void shutdown(long awaitTerminateMillis) { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: - this.consumeMessageService.shutdown(); + this.consumeMessageService.shutdown(awaitTerminateMillis); this.persistConsumerOffset(); this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); @@ -625,7 +629,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; - this.consumeMessageService.shutdown(); + this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index e6f0e866882ec460ba4505d007fca8e0ebfd995c..9395451594043f425afc73ceb08d9d8d74c92976 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; @@ -256,6 +257,34 @@ public class DefaultMQPushConsumerTest { } } + @Test + public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + pushConsumer.setAwaitTerminationMillisWhenShutdown(2000); + final AtomicBoolean messageConsumedFlag = new AtomicBoolean(false); + pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + countDownLatch.countDown(); + try { + Thread.sleep(1000); + messageConsumedFlag.set(true); + } catch (InterruptedException e) { + } + + return null; + } + })); + + PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); + pullMessageService.executePullRequestImmediately(createPullRequest()); + countDownLatch.await(); + + pushConsumer.shutdown(); + assertThat(messageConsumedFlag.get()).isTrue(); + } + private DefaultMQPushConsumer createPushConsumer() { DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.registerMessageListener(new MessageListenerConcurrently() {