diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java index 14ffefb5da78358a4bf597d36d6a23849e8cf041..afef7c105410bbb5b99bf9fc5e0f828a211c7e20 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java @@ -16,10 +16,10 @@ * limitations under the License. */ - package org.apache.flink.core.fs.local; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -28,14 +28,15 @@ import org.apache.flink.core.fs.FSDataOutputStream; /** * The LocalDataOutputStream class is a wrapper class for a data * output stream to the local file system. - * */ public class LocalDataOutputStream extends FSDataOutputStream { + private static final int MAX_OPEN_TRIES = 3; + /** * The file output stream used to write data. */ - private FileOutputStream fos = null; + private FileOutputStream fos; /** * Constructs a new LocalDataOutputStream object from a given {@link File} object. @@ -46,26 +47,34 @@ public class LocalDataOutputStream extends FSDataOutputStream { * thrown if the data output stream cannot be created */ public LocalDataOutputStream(final File file) throws IOException { - - this.fos = new FileOutputStream(file); + // we allow multiple tries to create the file, to increase resilience against spurious I/O failures + + FileNotFoundException lastException = null; + + for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) { + try { + this.fos = new FileOutputStream(file); + return; + } + catch (FileNotFoundException e) { + lastException = e; + } + } + throw lastException; } - @Override public void write(final int b) throws IOException { fos.write(b); } - @Override public void write(final byte[] b, final int off, final int len) throws IOException { fos.write(b, off, len); } - @Override public void close() throws IOException { - fos.close(); } }