diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java index 2417cf09dc6d326c78b93080a533f8659961d3a2..0e33418f1bd5481bb37a16335d4a1a93f0ad175d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java @@ -27,10 +27,11 @@ public abstract class ServiceThread implements Runnable { private static final long JOIN_TIME = 90 * 1000; - protected Thread thread; + private Thread thread; protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); protected volatile boolean stopped = false; + protected boolean isDaemon = false; //Make it able to restart the thread private final AtomicBoolean started = new AtomicBoolean(false); @@ -48,6 +49,7 @@ public abstract class ServiceThread implements Runnable { } stopped = false; this.thread = new Thread(this, getServiceName()); + this.thread.setDaemon(isDaemon); this.thread.start(); } @@ -56,7 +58,7 @@ public abstract class ServiceThread implements Runnable { } public void shutdown(final boolean interrupt) { - log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); + log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(true, false)) { return; } @@ -88,12 +90,14 @@ public abstract class ServiceThread implements Runnable { return JOIN_TIME; } + @Deprecated public void stop() { this.stop(false); } + @Deprecated public void stop(final boolean interrupt) { - if (!started.get()) { + if (!started.compareAndSet(true, false)) { return; } this.stopped = true; @@ -109,7 +113,7 @@ public abstract class ServiceThread implements Runnable { } public void makeStop() { - if (!started.get()) { + if (!started.compareAndSet(true, false)) { return; } this.stopped = true; @@ -147,4 +151,12 @@ public abstract class ServiceThread implements Runnable { public boolean isStopped() { return stopped; } + + public boolean isDaemon() { + return isDaemon; + } + + public void setDaemon(boolean daemon) { + isDaemon = daemon; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index dbb27cd744c5e98ef4ffa87de26eb5593f5dc849..5e86b8a85fc466477b16af02adad224a44822bdf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -120,16 +120,9 @@ public class AllocateMappedFileService extends ServiceThread { return AllocateMappedFileService.class.getSimpleName(); } + @Override public void shutdown() { - this.stopped = true; - this.thread.interrupt(); - - try { - this.thread.join(this.getJointime()); - } catch (InterruptedException e) { - log.error("Interrupted", e); - } - + super.shutdown(true); for (AllocateRequest req : this.requestTable.values()) { if (req.mappedFile != null) { log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName()); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index 251dc9971f8613216e071d35f559053b1123f49a..c102881c1ca5e5c2f772b21ddf2175579553da92 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -90,7 +90,7 @@ public class HAConnection { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_READ); - this.thread.setDaemon(true); + this.setDaemon(true); } @Override @@ -205,7 +205,7 @@ public class HAConnection { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); - this.thread.setDaemon(true); + this.setDaemon(true); } @Override