diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java index 5dd15dd532015bd26fd3e572389a8bcb30270bc0..41169c9a50a127544f442bad2b01a6456e729904 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java @@ -22,7 +22,7 @@ import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Callable; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -34,6 +34,7 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.srvutil.ShutdownHookThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,24 +136,13 @@ public class FiltersrvStartup { System.exit(-3); } - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - private volatile boolean hasShutdown = false; - private AtomicInteger shutdownTimes = new AtomicInteger(0); - + Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { @Override - public void run() { - synchronized (this) { - log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); - if (!this.hasShutdown) { - this.hasShutdown = true; - long begineTime = System.currentTimeMillis(); - controller.shutdown(); - long consumingTimeTotal = System.currentTimeMillis() - begineTime; - log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); - } - } + public Void call() throws Exception { + controller.shutdown(); + return null; } - }, "ShutdownHook")); + })); return controller; } catch (Throwable e) { diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java index 4fa97ad0a4543836ec8b978b88b6321f23b03289..f49d2b31ffe6dcac11fadb6262ef8aac43f73c97 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java @@ -22,7 +22,7 @@ import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Callable; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -35,6 +35,7 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.srvutil.ShutdownHookThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +62,7 @@ public class NamesrvStartup { //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); - commandLine = - ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), - new PosixParser()); + commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; @@ -97,8 +96,7 @@ public class NamesrvStartup { MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { - System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation%n"); + System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation%n"); System.exit(-2); } @@ -123,24 +121,13 @@ public class NamesrvStartup { System.exit(-3); } - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - private volatile boolean hasShutdown = false; - private AtomicInteger shutdownTimes = new AtomicInteger(0); - + Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { @Override - public void run() { - synchronized (this) { - log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); - if (!this.hasShutdown) { - this.hasShutdown = true; - long begineTime = System.currentTimeMillis(); - controller.shutdown(); - long consumingTimeTotal = System.currentTimeMillis() - begineTime; - log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); - } - } + public Void call() throws Exception { + controller.shutdown(); + return null; } - }, "ShutdownHook")); + })); controller.start(); diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ShutdownHookThread.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ShutdownHookThread.java new file mode 100644 index 0000000000000000000000000000000000000000..11f9b2c3365a93258744450bde017e01610a6583 --- /dev/null +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ShutdownHookThread.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.srvutil; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; + +/** + * {@link ShutdownHookThread} is the standard hook for filtersrv and namesrv modules. + * Through {@link Callable} interface, this hook can customization operations in anywhere. + */ +public class ShutdownHookThread extends Thread { + private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); + private final Logger log; + private final Callable callback; + + /** + * Create the standard hook thread, with a call back, by using {@link Callable} interface. + * + * @param log The log instance is used in hook thread. + * @param callback The call back function. + */ + public ShutdownHookThread(Logger log, Callable callback) { + super("ShutdownHook"); + this.log = log; + this.callback = callback; + } + + /** + * Thread run method. + * Invoke when the jvm shutdown. + * 1. count the invocation times. + * 2. execute the {@link ShutdownHookThread#callback}, and time it. + */ + @Override + public void run() { + synchronized (this) { + log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet() + " times."); + if (!this.hasShutdown) { + this.hasShutdown = true; + long beginTime = System.currentTimeMillis(); + try { + this.callback.call(); + } catch (Exception e) { + log.error("shutdown hook callback invoked failure.", e); + } + long consumingTimeTotal = System.currentTimeMillis() - beginTime; + log.info("shutdown hook done, consuming time total(ms): " + consumingTimeTotal); + } + } + } +}