提交 2c49e6b1 编写于 作者: J Jaskey 提交者: RongtongJin

[ISSUE #2085]support graceful shutdown push consumer (#2084)

* support graceful shutdown push consumer

* remains the old shutdown method of DefaultMQPushConsumerImpl

* add unit test for graceful shutdown
上级 764effef
...@@ -254,6 +254,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -254,6 +254,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/ */
private long consumeTimeout = 15; private long consumeTimeout = 15;
/**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
private long awaitTerminationMillisWhenShutdown = 0;
/** /**
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
...@@ -705,7 +710,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -705,7 +710,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/ */
@Override @Override
public void shutdown() { public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown(); this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
if (null != traceDispatcher) { if (null != traceDispatcher) {
traceDispatcher.shutdown(); traceDispatcher.shutdown();
} }
...@@ -886,6 +891,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -886,6 +891,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeTimeout = consumeTimeout; this.consumeTimeout = consumeTimeout;
} }
public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}
public void setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
}
public TraceDispatcher getTraceDispatcher() { public TraceDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
......
...@@ -45,6 +45,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -45,6 +45,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
...@@ -92,9 +93,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService ...@@ -92,9 +93,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
} }
public void shutdown() { public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown(); this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown(); ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
this.cleanExpireMsgExecutors.shutdown(); this.cleanExpireMsgExecutors.shutdown();
} }
......
...@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
...@@ -96,10 +97,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ...@@ -96,10 +97,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
} }
} }
public void shutdown() { public void shutdown(long awaitTerminateMillis) {
this.stopped = true; this.stopped = true;
this.scheduledExecutorService.shutdown(); this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown(); ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ(); this.unlockAllMQ();
} }
......
...@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; ...@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
public interface ConsumeMessageService { public interface ConsumeMessageService {
void start(); void start();
void shutdown(); void shutdown(long awaitTerminateMillis);
void updateCorePoolSize(int corePoolSize); void updateCorePoolSize(int corePoolSize);
......
...@@ -546,12 +546,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -546,12 +546,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
} }
public synchronized void shutdown() { public void shutdown() {
shutdown(0);
}
public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) { switch (this.serviceState) {
case CREATE_JUST: case CREATE_JUST:
break; break;
case RUNNING: case RUNNING:
this.consumeMessageService.shutdown(); this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset(); this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown(); this.mQClientFactory.shutdown();
...@@ -625,7 +629,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -625,7 +629,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) { if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST; this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(); this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null); null);
......
...@@ -25,6 +25,7 @@ import java.util.List; ...@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
...@@ -256,6 +257,34 @@ public class DefaultMQPushConsumerTest { ...@@ -256,6 +257,34 @@ public class DefaultMQPushConsumerTest {
} }
} }
@Test
public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
pushConsumer.setAwaitTerminationMillisWhenShutdown(2000);
final AtomicBoolean messageConsumedFlag = new AtomicBoolean(false);
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
countDownLatch.countDown();
try {
Thread.sleep(1000);
messageConsumedFlag.set(true);
} catch (InterruptedException e) {
}
return null;
}
}));
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();
pushConsumer.shutdown();
assertThat(messageConsumedFlag.get()).isTrue();
}
private DefaultMQPushConsumer createPushConsumer() { private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() { pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册