提交 e3f4251c 编写于 作者: Y yukon

[ROCKETMQ-119] Add ThreadUtils and shutdown PullMessageService properly

上级 53b98d0d
...@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; ...@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
public class PullMessageService extends ServiceThread { public class PullMessageService extends ServiceThread {
...@@ -97,6 +98,12 @@ public class PullMessageService extends ServiceThread { ...@@ -97,6 +98,12 @@ public class PullMessageService extends ServiceThread {
log.info(this.getServiceName() + " service end"); log.info(this.getServiceName() + " service end");
} }
@Override
public void shutdown(boolean interrupt) {
super.shutdown(interrupt);
ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
}
@Override @Override
public String getServiceName() { public String getServiceName() {
return PullMessageService.class.getSimpleName(); return PullMessageService.class.getSimpleName();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class ThreadUtils {
private static final Logger log = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
}
public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon));
}
public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
boolean isDaemon) {
return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon));
}
public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
return newGenericThreadFactory("Remoting-" + processName, isDaemon);
}
public static ThreadFactory newGenericThreadFactory(String processName) {
return newGenericThreadFactory(processName, false);
}
public static ThreadFactory newGenericThreadFactory(String processName, int threads) {
return newGenericThreadFactory(processName, threads, false);
}
public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
return new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
thread.setDaemon(isDaemon);
return thread;
}
};
}
public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
final boolean isDaemon) {
return new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
thread.setDaemon(isDaemon);
return thread;
}
};
}
/**
* Create a new thread
*
* @param name The name of the thread
* @param runnable The work for the thread to do
* @param daemon Should the thread block JVM stop?
* @return The unstarted thread
*/
public static Thread newThread(String name, Runnable runnable, boolean daemon) {
Thread thread = new Thread(runnable, name);
thread.setDaemon(daemon);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.error("Uncaught exception in thread '" + t.getName() + "':", e);
}
});
return thread;
}
/**
* Shutdown passed thread using isAlive and join.
*
* @param t Thread to stop
*/
public static void shutdownGracefully(final Thread t) {
shutdownGracefully(t, 0);
}
/**
* Shutdown passed thread using isAlive and join.
*
* @param millis Pass 0 if we're to wait forever.
* @param t Thread to stop
*/
public static void shutdownGracefully(final Thread t, final long millis) {
if (t == null)
return;
while (t.isAlive()) {
try {
t.interrupt();
t.join(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* An implementation of the graceful stop sequence recommended by
* {@link ExecutorService}.
*
* @param executor executor
* @param timeout timeout
* @param timeUnit timeUnit
*/
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
// Disable new tasks from being submitted.
executor.shutdown();
try {
// Wait a while for existing tasks to terminate.
if (!executor.awaitTermination(timeout, timeUnit)) {
executor.shutdownNow();
// Wait a while for tasks to respond to being cancelled.
if (!executor.awaitTermination(timeout, timeUnit)) {
log.warn(String.format("%s didn't terminate!", executor));
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted.
executor.shutdownNow();
// Preserve interrupt status.
Thread.currentThread().interrupt();
}
}
/**
* A constructor to stop this class being constructed.
*/
private ThreadUtils() {
// Unused
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册