From dd9979f8ffc7de178968298d8c21df7315633e9b Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 1 Sep 2015 12:10:30 +0200 Subject: [PATCH] [FLINK-2601] [runtime] Install shutdown hook at the end of I/O manager initialization This prevents possible null pointers when the JVM shuts down before the I/O manager was fully started. --- .../runtime/io/disk/iomanager/IOManager.java | 32 ------------- .../io/disk/iomanager/IOManagerAsync.java | 47 +++++++++++++++---- 2 files changed, 39 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 45d9b9eaf1c..0942f722aeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -63,9 +63,6 @@ public abstract class IOManager { /** The number of the next path to use. */ private volatile int nextPath; - /** Shutdown hook to make sure that the directories are removed on exit */ - private final Thread shutdownHook; - // ------------------------------------------------------------------------- // Constructors / Destructors // ------------------------------------------------------------------------- @@ -96,21 +93,6 @@ public abstract class IOManager { paths[i] = storageDir; LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); } - - this.shutdownHook = new Thread("I/O manager shutdown hook") { - @Override - public void run() { - shutdown(); - } - }; - try { - Runtime.getRuntime().addShutdownHook(this.shutdownHook); - } catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - LOG.debug("Unable to add shutdown hook, shutdown already in progress", e); - } catch (Throwable t) { - LOG.warn("Error while adding shutdown hook for IOManager", t); - } } /** @@ -131,20 +113,6 @@ public abstract class IOManager { LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); } } - - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself - if (shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - LOG.debug("Unable to remove shutdown hook, shutdown already in progress", e); - } - catch (Throwable t) { - LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); - } - } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 4401d0398cd..0db4ac63b33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.EnvironmentInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; @@ -46,9 +44,10 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle /** Flag to signify that the IOManager has been shut down already */ private final AtomicBoolean isShutdown = new AtomicBoolean(); - /** Logging */ - private static final Logger LOG = LoggerFactory.getLogger(IOManagerAsync.class); + /** Shutdown hook to make sure that the directories are removed on exit */ + private final Thread shutdownHook; + // ------------------------------------------------------------------------- // Constructors / Destructors // ------------------------------------------------------------------------- @@ -98,6 +97,24 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle t.setUncaughtExceptionHandler(this); t.start(); } + + // install a shutdown hook that makes sure the temp directories get deleted + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + try { + Runtime.getRuntime().addShutdownHook(this.shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + LOG.debug("Unable to add shutdown hook, shutdown already in progress", e); + } + catch (Throwable t) { + LOG.warn("Error while adding shutdown hook for IOManager", t); + } } /** @@ -112,6 +129,20 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle return; } + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + LOG.debug("Unable to remove shutdown hook, shutdown already in progress", e); + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + try { if (LOG.isDebugEnabled()) { LOG.debug("Shutting down I/O manager."); @@ -235,7 +266,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle @Override public BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException { - checkState(!isShutdown.get(), "I/O-Manger is shut down."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBufferFileSegmentReader(channelID, readers[channelID.getThreadNum()].requestQueue, callback); } @@ -257,7 +288,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle */ @Override public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, - List targetSegments, int numBlocks) throws IOException + List targetSegments, int numBlocks) throws IOException { checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks); @@ -313,7 +344,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle try { join(1000); } - catch (InterruptedException e) {} + catch (InterruptedException ignored) {} // notify all pending write requests that the thread has been shut down IOException ioex = new IOException("IO-Manager has been closed."); @@ -418,7 +449,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle try { join(1000); } - catch (InterruptedException e) {} + catch (InterruptedException ignored) {} // notify all pending write requests that the thread has been shut down IOException ioex = new IOException("IO-Manager has been closed."); -- GitLab