提交 dd9979f8 编写于 作者: S Stephan Ewen

[FLINK-2601] [runtime] Install shutdown hook at the end of I/O manager initialization

This prevents possible null pointers when the JVM shuts down before the I/O manager was fully started.
上级 31aedede
......@@ -63,9 +63,6 @@ public abstract class IOManager {
/** The number of the next path to use. */
private volatile int nextPath;
/** Shutdown hook to make sure that the directories are removed on exit */
private final Thread shutdownHook;
// -------------------------------------------------------------------------
// Constructors / Destructors
// -------------------------------------------------------------------------
......@@ -96,21 +93,6 @@ public abstract class IOManager {
paths[i] = storageDir;
LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
}
this.shutdownHook = new Thread("I/O manager shutdown hook") {
@Override
public void run() {
shutdown();
}
};
try {
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
} catch (Throwable t) {
LOG.warn("Error while adding shutdown hook for IOManager", t);
}
}
/**
......@@ -131,20 +113,6 @@ public abstract class IOManager {
LOG.error("IOManager failed to properly clean up temp file directory: " + path, t);
}
}
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to remove shutdown hook, shutdown already in progress", e);
}
catch (Throwable t) {
LOG.warn("Exception while unregistering IOManager's shutdown hook.", t);
}
}
}
/**
......
......@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
......@@ -46,9 +44,10 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
/** Flag to signify that the IOManager has been shut down already */
private final AtomicBoolean isShutdown = new AtomicBoolean();
/** Logging */
private static final Logger LOG = LoggerFactory.getLogger(IOManagerAsync.class);
/** Shutdown hook to make sure that the directories are removed on exit */
private final Thread shutdownHook;
// -------------------------------------------------------------------------
// Constructors / Destructors
// -------------------------------------------------------------------------
......@@ -98,6 +97,24 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
t.setUncaughtExceptionHandler(this);
t.start();
}
// install a shutdown hook that makes sure the temp directories get deleted
this.shutdownHook = new Thread("I/O manager shutdown hook") {
@Override
public void run() {
shutdown();
}
};
try {
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
}
catch (Throwable t) {
LOG.warn("Error while adding shutdown hook for IOManager", t);
}
}
/**
......@@ -112,6 +129,20 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
return;
}
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to remove shutdown hook, shutdown already in progress", e);
}
catch (Throwable t) {
LOG.warn("Exception while unregistering IOManager's shutdown hook.", t);
}
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down I/O manager.");
......@@ -235,7 +266,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
@Override
public BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException {
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBufferFileSegmentReader(channelID, readers[channelID.getThreadNum()].requestQueue, callback);
}
......@@ -257,7 +288,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
*/
@Override
public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
List<MemorySegment> targetSegments, int numBlocks) throws IOException
List<MemorySegment> targetSegments, int numBlocks) throws IOException
{
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
......@@ -313,7 +344,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
try {
join(1000);
}
catch (InterruptedException e) {}
catch (InterruptedException ignored) {}
// notify all pending write requests that the thread has been shut down
IOException ioex = new IOException("IO-Manager has been closed.");
......@@ -418,7 +449,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
try {
join(1000);
}
catch (InterruptedException e) {}
catch (InterruptedException ignored) {}
// notify all pending write requests that the thread has been shut down
IOException ioex = new IOException("IO-Manager has been closed.");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册