diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index 8ddd483ad3585d42ae331da147e989f8cdcc8928..ed4b83718206249dd06dba206f945366f76b72d6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.slf4j.Logger; public class PullMessageService extends ServiceThread { @@ -97,6 +98,12 @@ public class PullMessageService extends ServiceThread { log.info(this.getServiceName() + " service end"); } + @Override + public void shutdown(boolean interrupt) { + super.shutdown(interrupt); + ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS); + } + @Override public String getServiceName() { return PullMessageService.class.getSimpleName(); diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..8c28d70027ea20100c73920ea7d3bd1e4934980e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.utils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ThreadUtils { + private static final Logger log = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME); + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue workQueue, String processName, boolean isDaemon) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); + } + + public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { + return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { + return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); + } + + public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, + boolean isDaemon) { + return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); + } + + public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { + return newGenericThreadFactory("Remoting-" + processName, isDaemon); + } + + public static ThreadFactory newGenericThreadFactory(String processName) { + return newGenericThreadFactory(processName, false); + } + + public static ThreadFactory newGenericThreadFactory(String processName, int threads) { + return newGenericThreadFactory(processName, threads, false); + } + + public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) { + return new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet())); + thread.setDaemon(isDaemon); + return thread; + } + }; + } + + public static ThreadFactory newGenericThreadFactory(final String processName, final int threads, + final boolean isDaemon) { + return new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet())); + thread.setDaemon(isDaemon); + return thread; + } + }; + } + + /** + * Create a new thread + * + * @param name The name of the thread + * @param runnable The work for the thread to do + * @param daemon Should the thread block JVM stop? + * @return The unstarted thread + */ + public static Thread newThread(String name, Runnable runnable, boolean daemon) { + Thread thread = new Thread(runnable, name); + thread.setDaemon(daemon); + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + log.error("Uncaught exception in thread '" + t.getName() + "':", e); + } + }); + return thread; + } + + /** + * Shutdown passed thread using isAlive and join. + * + * @param t Thread to stop + */ + public static void shutdownGracefully(final Thread t) { + shutdownGracefully(t, 0); + } + + /** + * Shutdown passed thread using isAlive and join. + * + * @param millis Pass 0 if we're to wait forever. + * @param t Thread to stop + */ + public static void shutdownGracefully(final Thread t, final long millis) { + if (t == null) + return; + while (t.isAlive()) { + try { + t.interrupt(); + t.join(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * An implementation of the graceful stop sequence recommended by + * {@link ExecutorService}. + * + * @param executor executor + * @param timeout timeout + * @param timeUnit timeUnit + */ + public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) { + // Disable new tasks from being submitted. + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate. + if (!executor.awaitTermination(timeout, timeUnit)) { + executor.shutdownNow(); + // Wait a while for tasks to respond to being cancelled. + if (!executor.awaitTermination(timeout, timeUnit)) { + log.warn(String.format("%s didn't terminate!", executor)); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted. + executor.shutdownNow(); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } + + /** + * A constructor to stop this class being constructed. + */ + private ThreadUtils() { + // Unused + + } +}