From 2c49e6b129c1998421f4e199e713d55cb376a0fb Mon Sep 17 00:00:00 2001 From: Jaskey Date: Fri, 12 Jun 2020 20:45:41 +0800 Subject: [PATCH] [ISSUE #2085]support graceful shutdown push consumer (#2084) * support graceful shutdown push consumer * remains the old shutdown method of DefaultMQPushConsumerImpl * add unit test for graceful shutdown --- .../consumer/DefaultMQPushConsumer.java | 15 +++++++++- .../ConsumeMessageConcurrentlyService.java | 5 ++-- .../ConsumeMessageOrderlyService.java | 5 ++-- .../impl/consumer/ConsumeMessageService.java | 2 +- .../consumer/DefaultMQPushConsumerImpl.java | 10 +++++-- .../consumer/DefaultMQPushConsumerTest.java | 29 +++++++++++++++++++ 6 files changed, 57 insertions(+), 9 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 6ad0fc30..9011117a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -254,6 +254,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private long consumeTimeout = 15; + /** + * Maximum time to await message consuming when shutdown consumer, 0 indicates no await. + */ + private long awaitTerminationMillisWhenShutdown = 0; + /** * Interface of asynchronous transfer data */ @@ -705,7 +710,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Override public void shutdown() { - this.defaultMQPushConsumerImpl.shutdown(); + this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown); if (null != traceDispatcher) { traceDispatcher.shutdown(); } @@ -886,6 +891,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.consumeTimeout = consumeTimeout; } + public long getAwaitTerminationMillisWhenShutdown() { + return awaitTerminationMillisWhenShutdown; + } + + public void setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) { + this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown; + } + public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 258e4dbf..b37f8a63 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -92,9 +93,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } - public void shutdown() { + public void shutdown(long awaitTerminateMillis) { this.scheduledExecutorService.shutdown(); - this.consumeExecutor.shutdown(); + ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); this.cleanExpireMsgExecutors.shutdown(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index f65567b6..09299160 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -96,10 +97,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } } - public void shutdown() { + public void shutdown(long awaitTerminateMillis) { this.stopped = true; this.scheduledExecutorService.shutdown(); - this.consumeExecutor.shutdown(); + ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { this.unlockAllMQ(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java index 0f6f3bb3..5078c978 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; public interface ConsumeMessageService { void start(); - void shutdown(); + void shutdown(long awaitTerminateMillis); void updateCorePoolSize(int corePoolSize); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 25a81a0e..ab585ea4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -546,12 +546,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public synchronized void shutdown() { + public void shutdown() { + shutdown(0); + } + + public synchronized void shutdown(long awaitTerminateMillis) { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: - this.consumeMessageService.shutdown(); + this.consumeMessageService.shutdown(awaitTerminateMillis); this.persistConsumerOffset(); this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); @@ -625,7 +629,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; - this.consumeMessageService.shutdown(); + this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index e6f0e866..93954515 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; @@ -256,6 +257,34 @@ public class DefaultMQPushConsumerTest { } } + @Test + public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + pushConsumer.setAwaitTerminationMillisWhenShutdown(2000); + final AtomicBoolean messageConsumedFlag = new AtomicBoolean(false); + pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + countDownLatch.countDown(); + try { + Thread.sleep(1000); + messageConsumedFlag.set(true); + } catch (InterruptedException e) { + } + + return null; + } + })); + + PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); + pullMessageService.executePullRequestImmediately(createPullRequest()); + countDownLatch.await(); + + pushConsumer.shutdown(); + assertThat(messageConsumedFlag.get()).isTrue(); + } + private DefaultMQPushConsumer createPushConsumer() { DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { -- GitLab